Skip to main content

r/interpreter/builtins/
connections.rs

1//! Connection object builtins — file connections, stdin/stdout/stderr, TCP sockets.
2//!
3//! R connections wrap file handles (or other I/O sources) behind integer IDs.
4//! This module provides: `file()`, `open()`, `close()`, `isOpen()`,
5//! `readLines()`/`writeLines()` connection dispatch, the three standard stream
6//! constructors `stdin()`, `stdout()`, `stderr()`, and TCP client socket
7//! builtins: `make.socket()`, `read.socket()`, `write.socket()`, `close.socket()`.
8//!
9//! Connection metadata lives on the `Interpreter` struct as a `Vec<ConnectionInfo>`.
10//! Slots 0-2 are pre-allocated for stdin, stdout, and stderr.
11//!
12//! TCP streams (`std::net::TcpStream`) are not `Clone`, so they are stored
13//! separately in a `HashMap<usize, TcpStream>` keyed by connection ID.
14
15use std::io::{Read, Write};
16use std::net::{Shutdown, TcpStream};
17
18use bstr::ByteSlice;
19
20use super::CallArgs;
21use crate::interpreter::value::*;
22use crate::interpreter::BuiltinContext;
23use crate::interpreter::Interpreter;
24use itertools::Itertools;
25use minir_macros::interpreter_builtin;
26
27// region: ConnectionKind + ConnectionInfo
28
29fn resolved_path_string(context: &BuiltinContext, path: &str) -> String {
30    context
31        .interpreter()
32        .resolve_path(path)
33        .to_string_lossy()
34        .to_string()
35}
36
37/// Discriminates what kind of I/O backing a connection has.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum ConnectionKind {
40    /// Standard stream (stdin/stdout/stderr).
41    StdStream,
42    /// File connection — path is stored in `ConnectionInfo::path`.
43    File,
44    /// TCP client socket — the actual `TcpStream` handle lives in
45    /// `Interpreter::tcp_streams`, keyed by connection ID.
46    TcpClient,
47    /// URL connection (HTTP/HTTPS) — the fetched body lives in
48    /// `Interpreter::url_bodies`, keyed by connection ID.
49    Url,
50}
51
52/// Describes a single connection slot in the interpreter.
53#[derive(Debug, Clone)]
54pub struct ConnectionInfo {
55    /// File path (empty for stdin/stdout/stderr and TCP sockets).
56    pub path: String,
57    /// Open mode string (e.g. "r", "w", "rt", "wt"). Empty means not yet opened.
58    pub mode: String,
59    /// Whether the connection is currently open.
60    pub is_open: bool,
61    /// Human-readable description (e.g. "stdin", "stdout", the file path, or "host:port").
62    pub description: String,
63    /// What kind of I/O this connection wraps.
64    pub kind: ConnectionKind,
65}
66
67impl ConnectionInfo {
68    /// Create a new file connection with the given path and description, initially closed.
69    fn new(path: String, description: String) -> Self {
70        Self {
71            path,
72            mode: String::new(),
73            is_open: false,
74            description,
75            kind: ConnectionKind::File,
76        }
77    }
78
79    /// Create a standard stream connection (pre-opened).
80    fn std_stream(description: &str) -> Self {
81        Self {
82            path: String::new(),
83            mode: String::new(),
84            is_open: true,
85            description: description.to_string(),
86            kind: ConnectionKind::StdStream,
87        }
88    }
89
90    /// Create a TCP client socket connection (pre-opened).
91    fn tcp_client(description: String) -> Self {
92        Self {
93            path: String::new(),
94            mode: "a+".to_string(),
95            is_open: true,
96            description,
97            kind: ConnectionKind::TcpClient,
98        }
99    }
100
101    /// Create a URL connection, initially closed.
102    pub fn url_connection(url: String) -> Self {
103        Self {
104            path: url.clone(),
105            mode: String::new(),
106            is_open: false,
107            description: url,
108            kind: ConnectionKind::Url,
109        }
110    }
111}
112
113// endregion
114
115// region: Interpreter connection helpers
116
117impl Interpreter {
118    /// Ensure the connections table is initialised with the three standard streams.
119    /// Called lazily on first access.
120    pub(crate) fn ensure_connections(&self) {
121        let mut conns = self.connections.borrow_mut();
122        if conns.is_empty() {
123            conns.push(ConnectionInfo::std_stream("stdin"));
124            conns.push(ConnectionInfo::std_stream("stdout"));
125            conns.push(ConnectionInfo::std_stream("stderr"));
126        }
127    }
128
129    /// Allocate a new connection slot, returning its integer ID.
130    pub(crate) fn add_connection(&self, info: ConnectionInfo) -> usize {
131        self.ensure_connections();
132        let mut conns = self.connections.borrow_mut();
133        let id = conns.len();
134        conns.push(info);
135        id
136    }
137
138    /// Get a clone of the connection info at `id`, or None if out of range.
139    pub(crate) fn get_connection(&self, id: usize) -> Option<ConnectionInfo> {
140        self.ensure_connections();
141        self.connections.borrow().get(id).cloned()
142    }
143
144    /// Mutate a connection in place. Returns an error if the ID is invalid.
145    pub(crate) fn with_connection_mut<F>(&self, id: usize, f: F) -> Result<(), RError>
146    where
147        F: FnOnce(&mut ConnectionInfo),
148    {
149        self.ensure_connections();
150        let mut conns = self.connections.borrow_mut();
151        let conn = conns.get_mut(id).ok_or_else(|| {
152            RError::new(RErrorKind::Argument, format!("invalid connection id {id}"))
153        })?;
154        f(conn);
155        Ok(())
156    }
157
158    /// Store a TCP stream for the given connection ID.
159    pub(crate) fn store_tcp_stream(&self, id: usize, stream: TcpStream) {
160        self.tcp_streams.borrow_mut().insert(id, stream);
161    }
162
163    /// Remove and return a TCP stream for the given connection ID, if present.
164    pub(crate) fn take_tcp_stream(&self, id: usize) -> Option<TcpStream> {
165        self.tcp_streams.borrow_mut().remove(&id)
166    }
167
168    /// Execute a closure with mutable access to the TCP stream for `id`.
169    /// Returns an error if no TCP stream exists for that connection.
170    pub(crate) fn with_tcp_stream<F, T>(&self, id: usize, f: F) -> Result<T, RError>
171    where
172        F: FnOnce(&mut TcpStream) -> Result<T, RError>,
173    {
174        let mut streams = self.tcp_streams.borrow_mut();
175        let stream = streams.get_mut(&id).ok_or_else(|| {
176            RError::new(
177                RErrorKind::Argument,
178                format!("connection {id} does not have an active TCP stream"),
179            )
180        })?;
181        f(stream)
182    }
183
184    /// Store a fetched URL response body for the given connection ID.
185    #[cfg(feature = "tls")]
186    pub(crate) fn store_url_body(&self, id: usize, body: Vec<u8>) {
187        self.url_bodies.borrow_mut().insert(id, body);
188    }
189
190    /// Take (remove) the URL response body for the given connection ID.
191    #[cfg(feature = "tls")]
192    pub(crate) fn take_url_body(&self, id: usize) -> Option<Vec<u8>> {
193        self.url_bodies.borrow_mut().remove(&id)
194    }
195
196    /// Get a clone of the URL response body for the given connection ID.
197    #[cfg(feature = "tls")]
198    pub(crate) fn get_url_body(&self, id: usize) -> Option<Vec<u8>> {
199        self.url_bodies.borrow().get(&id).cloned()
200    }
201}
202
203// endregion
204
205// region: Helper — build a connection RValue
206
207/// Build an integer scalar with class `"connection"` representing connection `id`.
208fn connection_value(id: usize) -> RValue {
209    let mut rv = RVector::from(Vector::Integer(
210        vec![Some(i64::try_from(id).unwrap_or(0))].into(),
211    ));
212    rv.set_attr(
213        "class".to_string(),
214        RValue::vec(Vector::Character(
215            vec![Some("connection".to_string())].into(),
216        )),
217    );
218    RValue::Vector(rv)
219}
220
221/// Extract a connection ID from an argument that is either an integer (possibly
222/// with class "connection") or a double that can be losslessly converted.
223fn connection_id(val: &RValue) -> Option<usize> {
224    val.as_vector()
225        .and_then(|v| v.as_integer_scalar())
226        .and_then(|i| usize::try_from(i).ok())
227}
228
229/// Returns `true` if `val` carries the `"connection"` class attribute.
230fn is_connection(val: &RValue) -> bool {
231    match val {
232        RValue::Vector(rv) => rv
233            .class()
234            .map(|cls| cls.iter().any(|c| c == "connection"))
235            .unwrap_or(false),
236        _ => false,
237    }
238}
239
240// endregion
241
242// region: File/stream builtins
243
244/// Create a file connection.
245///
246/// Returns an integer connection ID with class "connection". The connection
247/// is not opened unless `open` is non-empty.
248///
249/// @param description character scalar: file path
250/// @param open character scalar: open mode ("" means unopened, "r", "w", etc.)
251/// @return integer scalar with class "connection"
252#[interpreter_builtin(name = "file", min_args = 1)]
253fn interp_file(
254    args: &[RValue],
255    named: &[(String, RValue)],
256    context: &BuiltinContext,
257) -> Result<RValue, RError> {
258    let call_args = CallArgs::new(args, named);
259    let path = call_args.string("description", 0)?;
260    let open_mode = call_args.optional_string("open", 1).unwrap_or_default();
261
262    let interp = context.interpreter();
263    let mut info = ConnectionInfo::new(path.clone(), path);
264    if !open_mode.is_empty() {
265        info.mode = open_mode;
266        info.is_open = true;
267    }
268    let id = interp.add_connection(info);
269    Ok(connection_value(id))
270}
271
272/// Open a connection.
273///
274/// If the connection is already open this is a no-op. Otherwise the mode is
275/// recorded and the connection is marked open.
276///
277/// @param con integer scalar: connection ID
278/// @param open character scalar: open mode (default "r")
279/// @return the connection (invisibly)
280#[interpreter_builtin(name = "open", min_args = 1)]
281fn interp_open(
282    args: &[RValue],
283    named: &[(String, RValue)],
284    context: &BuiltinContext,
285) -> Result<RValue, RError> {
286    let call_args = CallArgs::new(args, named);
287    let con_val = call_args.value("con", 0).ok_or_else(|| {
288        RError::new(
289            RErrorKind::Argument,
290            "argument 'con' is missing".to_string(),
291        )
292    })?;
293    let id = connection_id(con_val)
294        .ok_or_else(|| RError::new(RErrorKind::Argument, "invalid connection".to_string()))?;
295    let mode = call_args
296        .optional_string("open", 1)
297        .unwrap_or_else(|| "r".to_string());
298
299    let interp = context.interpreter();
300    interp.with_connection_mut(id, |conn| {
301        if !conn.is_open {
302            conn.mode = mode;
303            conn.is_open = true;
304        }
305    })?;
306    Ok(connection_value(id))
307}
308
309/// Close a connection or progress bar.
310///
311/// If the argument has class `"txtProgressBar"`, finishes the bar and removes it.
312/// Otherwise, marks the connection as closed. For TCP socket connections, also
313/// shuts down and removes the underlying stream. Returns invisible NULL.
314///
315/// @param con integer scalar: connection or progress bar ID
316/// @return NULL (invisibly)
317#[interpreter_builtin(name = "close", min_args = 1)]
318fn interp_close(
319    args: &[RValue],
320    named: &[(String, RValue)],
321    context: &BuiltinContext,
322) -> Result<RValue, RError> {
323    let call_args = CallArgs::new(args, named);
324    let con_val = call_args.value("con", 0).ok_or_else(|| {
325        RError::new(
326            RErrorKind::Argument,
327            "argument 'con' is missing".to_string(),
328        )
329    })?;
330
331    // Dispatch to progress bar close if the argument carries class "txtProgressBar".
332    #[cfg(feature = "progress")]
333    if super::progress::is_progress_bar(con_val) {
334        let id = con_val
335            .as_vector()
336            .and_then(|v| v.as_integer_scalar())
337            .and_then(|i| usize::try_from(i).ok())
338            .ok_or_else(|| {
339                RError::new(
340                    RErrorKind::Argument,
341                    "invalid txtProgressBar object".to_string(),
342                )
343            })?;
344        let interp = context.interpreter();
345        if !interp.close_progress_bar(id) {
346            return Err(RError::new(
347                RErrorKind::Argument,
348                format!("progress bar {id} has already been closed or does not exist"),
349            ));
350        }
351        interp.set_invisible();
352        return Ok(RValue::Null);
353    }
354
355    let id = connection_id(con_val)
356        .ok_or_else(|| RError::new(RErrorKind::Argument, "invalid connection".to_string()))?;
357
358    let interp = context.interpreter();
359
360    // Check the connection kind and clean up associated resources.
361    let kind = interp
362        .get_connection(id)
363        .map(|c| c.kind.clone())
364        .unwrap_or(ConnectionKind::File);
365    match kind {
366        ConnectionKind::TcpClient => {
367            if let Some(stream) = interp.take_tcp_stream(id) {
368                drop(stream.shutdown(Shutdown::Both));
369            }
370        }
371        #[cfg(feature = "tls")]
372        ConnectionKind::Url => {
373            interp.take_url_body(id);
374        }
375        _ => {}
376    }
377
378    interp.with_connection_mut(id, |conn| {
379        conn.is_open = false;
380        conn.mode.clear();
381    })?;
382    Ok(RValue::Null)
383}
384
385/// Test whether a connection is open.
386///
387/// @param con integer scalar: connection ID
388/// @return logical scalar: TRUE if open, FALSE if closed
389#[interpreter_builtin(name = "isOpen", min_args = 1)]
390fn interp_is_open(
391    args: &[RValue],
392    named: &[(String, RValue)],
393    context: &BuiltinContext,
394) -> Result<RValue, RError> {
395    let call_args = CallArgs::new(args, named);
396    let con_val = call_args.value("con", 0).ok_or_else(|| {
397        RError::new(
398            RErrorKind::Argument,
399            "argument 'con' is missing".to_string(),
400        )
401    })?;
402    let id = connection_id(con_val)
403        .ok_or_else(|| RError::new(RErrorKind::Argument, "invalid connection".to_string()))?;
404
405    let interp = context.interpreter();
406    let conn = interp
407        .get_connection(id)
408        .ok_or_else(|| RError::new(RErrorKind::Argument, format!("invalid connection id {id}")))?;
409    Ok(RValue::vec(Vector::Logical(
410        vec![Some(conn.is_open)].into(),
411    )))
412}
413
414/// Read text lines from a file path or a connection.
415///
416/// If `con` is a character string, reads directly from that file path.
417/// If `con` is an integer with class "connection", reads from the
418/// connection's stored file path or TCP socket.
419///
420/// @param con character scalar or connection integer: source to read from
421/// @param n integer scalar: maximum number of lines to read (-1 for all)
422/// @return character vector with one element per line
423#[interpreter_builtin(name = "readLines", min_args = 1)]
424fn interp_read_lines(
425    args: &[RValue],
426    named: &[(String, RValue)],
427    context: &BuiltinContext,
428) -> Result<RValue, RError> {
429    let call_args = CallArgs::new(args, named);
430    let con_val = call_args.value("con", 0).ok_or_else(|| {
431        RError::new(
432            RErrorKind::Argument,
433            "argument 'con' is missing".to_string(),
434        )
435    })?;
436
437    let n = call_args.integer_or("n", 1, -1);
438
439    if is_connection(con_val) {
440        let id = connection_id(con_val)
441            .ok_or_else(|| RError::new(RErrorKind::Argument, "invalid connection".to_string()))?;
442        let interp = context.interpreter();
443        let conn = interp.get_connection(id).ok_or_else(|| {
444            RError::new(RErrorKind::Argument, format!("invalid connection id {id}"))
445        })?;
446
447        match conn.kind {
448            ConnectionKind::TcpClient => {
449                // Read from TCP socket — read available data, split into lines.
450                let data = interp.with_tcp_stream(id, |stream| {
451                    let mut buf = vec![0u8; 65536];
452                    let bytes_read = stream.read(&mut buf).map_err(|e| {
453                        RError::new(
454                            RErrorKind::Other,
455                            format!("error reading from socket '{}': {}", conn.description, e),
456                        )
457                    })?;
458                    Ok(String::from_utf8_lossy(&buf[..bytes_read]).into_owned())
459                })?;
460
461                let lines: Vec<Option<String>> = if n < 0 {
462                    data.lines().map(|l| Some(l.to_string())).collect()
463                } else {
464                    data.lines()
465                        .take(usize::try_from(n).unwrap_or(usize::MAX))
466                        .map(|l| Some(l.to_string()))
467                        .collect()
468                };
469                return Ok(RValue::vec(Vector::Character(lines.into())));
470            }
471            #[cfg(feature = "tls")]
472            ConnectionKind::Url => {
473                // Read from URL connection — body was fetched eagerly on open.
474                let body = interp.get_url_body(id).ok_or_else(|| {
475                    RError::new(
476                        RErrorKind::Other,
477                        format!(
478                            "URL connection {} ('{}') has no buffered content — \
479                             make sure to open the connection before reading",
480                            id, conn.description
481                        ),
482                    )
483                })?;
484                let data = String::from_utf8_lossy(&body);
485                let lines: Vec<Option<String>> = if n < 0 {
486                    data.lines().map(|l| Some(l.to_string())).collect()
487                } else {
488                    data.lines()
489                        .take(usize::try_from(n).unwrap_or(usize::MAX))
490                        .map(|l| Some(l.to_string()))
491                        .collect()
492                };
493                return Ok(RValue::vec(Vector::Character(lines.into())));
494            }
495            #[cfg(not(feature = "tls"))]
496            ConnectionKind::Url => {
497                return Err(RError::new(
498                    RErrorKind::Other,
499                    "URL connections require the 'tls' feature — \
500                     rebuild miniR with --features tls to enable HTTPS support"
501                        .to_string(),
502                ));
503            }
504            ConnectionKind::StdStream => {
505                return Err(RError::new(
506                    RErrorKind::Argument,
507                    format!(
508                        "cannot read from '{}' — standard stream connections are not supported for readLines",
509                        conn.description
510                    ),
511                ));
512            }
513            ConnectionKind::File => {
514                if conn.path.is_empty() {
515                    return Err(RError::new(
516                        RErrorKind::Argument,
517                        "connection has no file path".to_string(),
518                    ));
519                }
520                // Fall through to file reading below.
521            }
522        }
523    }
524
525    // File path reading — either from string argument or file connection.
526    // Uses bstr to read raw bytes and handle mixed/non-UTF-8 encodings gracefully.
527    let path = if is_connection(con_val) {
528        let id = connection_id(con_val).ok_or_else(|| {
529            RError::new(
530                RErrorKind::Argument,
531                "invalid connection object".to_string(),
532            )
533        })?;
534        let interp = context.interpreter();
535        let conn = interp.get_connection(id).ok_or_else(|| {
536            RError::new(RErrorKind::Argument, format!("connection {id} is not open"))
537        })?;
538        conn.path.clone()
539    } else {
540        call_args.string("con", 0)?
541    };
542    let path = resolved_path_string(context, &path);
543
544    // Read as raw bytes via bstr so we can handle non-UTF-8 files
545    let raw_bytes = std::fs::read(&path).map_err(|e| {
546        RError::new(
547            RErrorKind::Other,
548            format!("cannot open file '{}': {}", path, e),
549        )
550    })?;
551
552    // Use bstr's lines() which handles \n, \r\n, and \r line endings on
553    // arbitrary byte strings, then lossy-convert each line to UTF-8.
554    // Invalid byte sequences become U+FFFD (replacement character) instead
555    // of causing an error.
556    let lines: Vec<Option<String>> = if n < 0 {
557        raw_bytes
558            .lines()
559            .map(|line| Some(line.to_str_lossy().into_owned()))
560            .collect()
561    } else {
562        raw_bytes
563            .lines()
564            .take(usize::try_from(n).unwrap_or(usize::MAX))
565            .map(|line| Some(line.to_str_lossy().into_owned()))
566            .collect()
567    };
568    Ok(RValue::vec(Vector::Character(lines.into())))
569}
570
571/// Write text lines to a file path, connection, or stdout.
572///
573/// If `con` is a character string, writes to that file path.
574/// If `con` is an integer with class "connection", writes to the
575/// connection's stored file path, stdout for connection 1, or TCP socket.
576/// If `con` is omitted, writes to stdout.
577///
578/// @param text character vector of lines to write
579/// @param con character scalar or connection integer: destination
580/// @param sep character scalar: line separator (default "\n")
581/// @return NULL (invisibly)
582#[interpreter_builtin(name = "writeLines", min_args = 1)]
583fn interp_write_lines(
584    args: &[RValue],
585    named: &[(String, RValue)],
586    context: &BuiltinContext,
587) -> Result<RValue, RError> {
588    let call_args = CallArgs::new(args, named);
589    let text = args
590        .first()
591        .and_then(|v| v.as_vector())
592        .map(|v| v.to_characters())
593        .unwrap_or_default();
594    let sep = call_args
595        .named_string("sep")
596        .unwrap_or_else(|| "\n".to_string());
597
598    let output: String = text
599        .iter()
600        .map(|s| s.clone().unwrap_or_else(|| "NA".to_string()))
601        .join(&sep);
602
603    // Determine destination from `con` argument (position 1).
604    let con_val = call_args.value("con", 1);
605
606    enum Dest {
607        Stdout,
608        File(String),
609        TcpSocket(usize),
610    }
611
612    let dest = match con_val {
613        Some(val) if is_connection(val) => {
614            let id = connection_id(val).ok_or_else(|| {
615                RError::new(RErrorKind::Argument, "invalid connection".to_string())
616            })?;
617            let interp = context.interpreter();
618            let conn = interp.get_connection(id).ok_or_else(|| {
619                RError::new(RErrorKind::Argument, format!("invalid connection id {id}"))
620            })?;
621            match conn.kind {
622                ConnectionKind::TcpClient => Dest::TcpSocket(id),
623                ConnectionKind::StdStream => {
624                    if id == 1 {
625                        Dest::Stdout
626                    } else {
627                        return Err(RError::new(
628                            RErrorKind::Argument,
629                            format!(
630                                "cannot write to '{}' — only stdout() is supported for writeLines",
631                                conn.description
632                            ),
633                        ));
634                    }
635                }
636                ConnectionKind::File => {
637                    if conn.path.is_empty() {
638                        return Err(RError::new(
639                            RErrorKind::Argument,
640                            "connection has no file path".to_string(),
641                        ));
642                    }
643                    Dest::File(conn.path.clone())
644                }
645                ConnectionKind::Url => {
646                    return Err(RError::new(
647                        RErrorKind::Argument,
648                        format!(
649                            "cannot write to URL connection '{}' — URL connections are read-only",
650                            conn.description
651                        ),
652                    ));
653                }
654            }
655        }
656        Some(val) => {
657            let path = val
658                .as_vector()
659                .and_then(|v| v.as_character_scalar())
660                .ok_or_else(|| {
661                    RError::new(RErrorKind::Argument, "invalid 'con' argument".to_string())
662                })?;
663            Dest::File(path)
664        }
665        None => Dest::Stdout,
666    };
667
668    match dest {
669        Dest::Stdout => {
670            context.write(&format!("{}\n", output));
671        }
672        Dest::File(path) => {
673            let path = resolved_path_string(context, &path);
674            std::fs::write(&path, format!("{}{}", output, sep)).map_err(|e| {
675                RError::new(
676                    RErrorKind::Other,
677                    format!("cannot write to file '{}': {}", path, e),
678                )
679            })?;
680        }
681        Dest::TcpSocket(id) => {
682            let interp = context.interpreter();
683            let payload = format!("{}{}", output, sep);
684            interp.with_tcp_stream(id, |stream| {
685                stream.write_all(payload.as_bytes()).map_err(|e| {
686                    RError::new(RErrorKind::Other, format!("error writing to socket: {}", e))
687                })
688            })?;
689        }
690    }
691    Ok(RValue::Null)
692}
693
694/// Return connection 0 (standard input).
695///
696/// @return integer scalar with class "connection" (value 0)
697#[interpreter_builtin(name = "stdin", min_args = 0, max_args = 0)]
698fn interp_stdin(
699    _args: &[RValue],
700    _named: &[(String, RValue)],
701    context: &BuiltinContext,
702) -> Result<RValue, RError> {
703    context.interpreter().ensure_connections();
704    Ok(connection_value(0))
705}
706
707/// Return connection 1 (standard output).
708///
709/// @return integer scalar with class "connection" (value 1)
710#[interpreter_builtin(name = "stdout", min_args = 0, max_args = 0)]
711fn interp_stdout(
712    _args: &[RValue],
713    _named: &[(String, RValue)],
714    context: &BuiltinContext,
715) -> Result<RValue, RError> {
716    context.interpreter().ensure_connections();
717    Ok(connection_value(1))
718}
719
720/// Return connection 2 (standard error).
721///
722/// @return integer scalar with class "connection" (value 2)
723#[interpreter_builtin(name = "stderr", min_args = 0, max_args = 0)]
724fn interp_stderr(
725    _args: &[RValue],
726    _named: &[(String, RValue)],
727    context: &BuiltinContext,
728) -> Result<RValue, RError> {
729    context.interpreter().ensure_connections();
730    Ok(connection_value(2))
731}
732
733// endregion
734
735// region: TCP socket builtins
736
737/// Create a TCP client socket connection.
738///
739/// Connects to the specified host and port via TCP. Only client mode
740/// (`server = FALSE`) is currently supported. Returns a connection ID
741/// with class "connection".
742///
743/// @param host character scalar: hostname or IP address to connect to
744/// @param port integer scalar: port number
745/// @param server logical scalar: whether to create a server socket (only FALSE supported)
746/// @return integer scalar with class "connection"
747#[interpreter_builtin(name = "make.socket", min_args = 2, namespace = "net")]
748fn interp_make_socket(
749    args: &[RValue],
750    named: &[(String, RValue)],
751    context: &BuiltinContext,
752) -> Result<RValue, RError> {
753    let call_args = CallArgs::new(args, named);
754    let host = call_args.string("host", 0)?;
755    let port = call_args.integer_or("port", 1, -1);
756    let server = call_args.logical_flag("server", 2, false);
757
758    if server {
759        return Err(RError::new(
760            RErrorKind::Argument,
761            "make.socket() with server = TRUE is not yet supported — \
762             only client sockets are implemented. Use server = FALSE (the default)."
763                .to_string(),
764        ));
765    }
766
767    if !(0..=65535).contains(&port) {
768        return Err(RError::new(
769            RErrorKind::Argument,
770            format!("invalid port number {port} — must be between 0 and 65535"),
771        ));
772    }
773    let port_u16 = u16::try_from(port)
774        .map_err(|_| RError::new(RErrorKind::Argument, format!("invalid port number {port}")))?;
775
776    let stream = TcpStream::connect((host.as_str(), port_u16)).map_err(|e| {
777        RError::new(
778            RErrorKind::Other,
779            format!(
780                "cannot connect to {}:{} — {}. \
781                 Check that the host is reachable and the port is open.",
782                host, port_u16, e
783            ),
784        )
785    })?;
786
787    let interp = context.interpreter();
788    let description = format!("{}:{}", host, port_u16);
789    let info = ConnectionInfo::tcp_client(description);
790    let id = interp.add_connection(info);
791    interp.store_tcp_stream(id, stream);
792
793    Ok(connection_value(id))
794}
795
796/// Read up to `maxlen` bytes from a TCP socket connection.
797///
798/// Returns the data as a character string. If no data is available, blocks
799/// until data arrives or the connection is closed.
800///
801/// @param socket integer scalar: connection ID of a TCP socket
802/// @param maxlen integer scalar: maximum number of bytes to read (default 256)
803/// @return character scalar containing the data read
804#[interpreter_builtin(name = "read.socket", min_args = 1, namespace = "net")]
805fn interp_read_socket(
806    args: &[RValue],
807    named: &[(String, RValue)],
808    context: &BuiltinContext,
809) -> Result<RValue, RError> {
810    let call_args = CallArgs::new(args, named);
811    let con_val = call_args.value("socket", 0).ok_or_else(|| {
812        RError::new(
813            RErrorKind::Argument,
814            "argument 'socket' is missing".to_string(),
815        )
816    })?;
817    let id = connection_id(con_val).ok_or_else(|| {
818        RError::new(
819            RErrorKind::Argument,
820            "invalid socket connection".to_string(),
821        )
822    })?;
823    let maxlen = call_args.integer_or("maxlen", 1, 256);
824
825    let interp = context.interpreter();
826
827    // Verify this is actually a TCP connection.
828    let conn = interp
829        .get_connection(id)
830        .ok_or_else(|| RError::new(RErrorKind::Argument, format!("invalid connection id {id}")))?;
831    if conn.kind != ConnectionKind::TcpClient {
832        return Err(RError::new(
833            RErrorKind::Argument,
834            format!(
835                "connection {} ('{}') is not a socket — read.socket() requires a TCP socket connection",
836                id, conn.description
837            ),
838        ));
839    }
840    if !conn.is_open {
841        return Err(RError::new(
842            RErrorKind::Argument,
843            format!(
844                "socket connection {} ('{}') is not open",
845                id, conn.description
846            ),
847        ));
848    }
849
850    let buf_size = usize::try_from(maxlen).unwrap_or(256);
851    let data = interp.with_tcp_stream(id, |stream| {
852        let mut buf = vec![0u8; buf_size];
853        let bytes_read = stream.read(&mut buf).map_err(|e| {
854            RError::new(
855                RErrorKind::Other,
856                format!("error reading from socket: {}", e),
857            )
858        })?;
859        Ok(String::from_utf8_lossy(&buf[..bytes_read]).into_owned())
860    })?;
861
862    Ok(RValue::vec(Vector::Character(vec![Some(data)].into())))
863}
864
865/// Write a string to a TCP socket connection.
866///
867/// Writes all bytes of the string to the socket. Returns invisible NULL.
868///
869/// @param socket integer scalar: connection ID of a TCP socket
870/// @param string character scalar: data to write
871/// @return NULL (invisibly)
872#[interpreter_builtin(name = "write.socket", min_args = 2, namespace = "net")]
873fn interp_write_socket(
874    args: &[RValue],
875    named: &[(String, RValue)],
876    context: &BuiltinContext,
877) -> Result<RValue, RError> {
878    let call_args = CallArgs::new(args, named);
879    let con_val = call_args.value("socket", 0).ok_or_else(|| {
880        RError::new(
881            RErrorKind::Argument,
882            "argument 'socket' is missing".to_string(),
883        )
884    })?;
885    let id = connection_id(con_val).ok_or_else(|| {
886        RError::new(
887            RErrorKind::Argument,
888            "invalid socket connection".to_string(),
889        )
890    })?;
891    let data = call_args.string("string", 1)?;
892
893    let interp = context.interpreter();
894
895    // Verify this is actually a TCP connection.
896    let conn = interp
897        .get_connection(id)
898        .ok_or_else(|| RError::new(RErrorKind::Argument, format!("invalid connection id {id}")))?;
899    if conn.kind != ConnectionKind::TcpClient {
900        return Err(RError::new(
901            RErrorKind::Argument,
902            format!(
903                "connection {} ('{}') is not a socket — write.socket() requires a TCP socket connection",
904                id, conn.description
905            ),
906        ));
907    }
908    if !conn.is_open {
909        return Err(RError::new(
910            RErrorKind::Argument,
911            format!(
912                "socket connection {} ('{}') is not open",
913                id, conn.description
914            ),
915        ));
916    }
917
918    interp.with_tcp_stream(id, |stream| {
919        stream
920            .write_all(data.as_bytes())
921            .map_err(|e| RError::new(RErrorKind::Other, format!("error writing to socket: {}", e)))
922    })?;
923
924    Ok(RValue::Null)
925}
926
927/// Close a TCP socket connection.
928///
929/// Shuts down the TCP stream and removes it from the connection table.
930/// Returns invisible NULL. This is the socket-specific close — the generic
931/// `close()` also handles TCP sockets.
932///
933/// @param socket integer scalar: connection ID of a TCP socket
934/// @return NULL (invisibly)
935#[interpreter_builtin(name = "close.socket", min_args = 1, max_args = 1, namespace = "net")]
936fn interp_close_socket(
937    args: &[RValue],
938    named: &[(String, RValue)],
939    context: &BuiltinContext,
940) -> Result<RValue, RError> {
941    let call_args = CallArgs::new(args, named);
942    let con_val = call_args.value("socket", 0).ok_or_else(|| {
943        RError::new(
944            RErrorKind::Argument,
945            "argument 'socket' is missing".to_string(),
946        )
947    })?;
948    let id = connection_id(con_val).ok_or_else(|| {
949        RError::new(
950            RErrorKind::Argument,
951            "invalid socket connection".to_string(),
952        )
953    })?;
954
955    let interp = context.interpreter();
956
957    // Verify this is actually a TCP connection.
958    let conn = interp
959        .get_connection(id)
960        .ok_or_else(|| RError::new(RErrorKind::Argument, format!("invalid connection id {id}")))?;
961    if conn.kind != ConnectionKind::TcpClient {
962        return Err(RError::new(
963            RErrorKind::Argument,
964            format!(
965                "connection {} ('{}') is not a socket — close.socket() requires a TCP socket connection",
966                id, conn.description
967            ),
968        ));
969    }
970
971    // Shut down and remove the TCP stream.
972    if let Some(stream) = interp.take_tcp_stream(id) {
973        drop(stream.shutdown(Shutdown::Both));
974    }
975
976    interp.with_connection_mut(id, |conn| {
977        conn.is_open = false;
978        conn.mode.clear();
979    })?;
980
981    Ok(RValue::Null)
982}
983
984// endregion