Skip to main content

miniextendr_api/
worker.rs

1//! Worker thread infrastructure for safe Rust-R FFI.
2//!
3//! ## Public API
4//!
5//! - [`with_r_thread`] — Execute a closure on R's main thread
6//! - [`is_r_main_thread`] — Check if the current thread is R's main thread
7//!
8//! ## Feature gate: `worker-thread`
9//!
10//! Without the `worker-thread` cargo feature, all calls execute inline on
11//! R's main thread:
12//! - `with_r_thread(f)` runs `f()` directly (panics if not on main thread)
13//! - `run_on_worker(f)` runs `f()` directly, returns `Ok(f())`
14//!
15//! With the feature enabled, a dedicated worker thread is spawned at init time.
16//! `with_r_thread` routes calls from the worker back to main, and `run_on_worker`
17//! dispatches closures to the worker with bidirectional communication.
18//!
19//! ## Initialization
20//!
21//! [`miniextendr_runtime_init`] must be called from R's main thread before any
22//! R FFI APIs. Typically done in `R_init_<pkgname>()`.
23
24use std::sync::OnceLock;
25use std::thread;
26
27use crate::ffi::{self, SEXP};
28
29static R_MAIN_THREAD_ID: OnceLock<thread::ThreadId> = OnceLock::new();
30
31// region: Public API
32
33/// Wrapper to mark values as Send for main-thread routing.
34///
35/// Only safe if the value is not accessed on the worker thread and is
36/// used exclusively on the main thread.
37#[doc(hidden)]
38#[repr(transparent)]
39#[derive(Clone, Copy)]
40pub struct Sendable<T>(pub T);
41
42unsafe impl<T> Send for Sendable<T> {}
43
44/// Check if the current thread is R's main thread.
45///
46/// Returns `true` if called from the main R thread, `false` otherwise.
47/// Before `miniextendr_runtime_init()` is called, always returns `false`.
48#[inline(always)]
49pub fn is_r_main_thread() -> bool {
50    R_MAIN_THREAD_ID
51        .get()
52        .map(|&id| id == std::thread::current().id())
53        .unwrap_or(false)
54}
55
56/// Execute a closure on R's main thread, returning the result.
57///
58/// This function can be called from any thread:
59/// - From the main thread: executes the closure directly (re-entrant)
60/// - From the worker thread (during `run_on_worker`): sends the work to
61///   the main thread and blocks until completion
62///
63/// # Panics
64///
65/// - If `miniextendr_runtime_init()` hasn't been called yet
66/// - If called from a non-main thread without the `worker-thread` feature
67/// - If called from a non-main thread outside of a `run_on_worker` context
68///   (even with the `worker-thread` feature)
69///
70/// # Example
71///
72/// ```ignore
73/// use miniextendr_api::with_r_thread;
74///
75/// // From worker thread, safely call R APIs:
76/// let sexp = with_r_thread(|| {
77///     // This runs on R's main thread
78///     SEXP::nil()
79/// });
80/// ```
81pub fn with_r_thread<F, R>(f: F) -> R
82where
83    F: FnOnce() -> R + 'static,
84    R: Send + 'static,
85{
86    assert_runtime_initialized();
87
88    if is_r_main_thread() {
89        return f();
90    }
91
92    // Not on main thread — need worker-thread feature for routing
93    #[cfg(not(feature = "worker-thread"))]
94    {
95        panic!(
96            "with_r_thread called from a non-main thread without the `worker-thread` feature.\n\
97             \n\
98             Without `worker-thread`, R API calls can only happen on the R main thread.\n\
99             Either:\n\
100             - Enable the `worker-thread` cargo feature to route calls from background threads, or\n\
101             - Ensure this code only runs on the R main thread."
102        );
103    }
104
105    #[cfg(feature = "worker-thread")]
106    {
107        worker_channel::route_to_main_thread(f)
108    }
109}
110// endregion
111
112// region: #[doc(hidden)] items for macro-generated code
113
114/// Raise an R error from a panic message. Does not return.
115///
116/// If `call` is `Some(sexp)`, uses `Rf_errorcall` to include call context.
117#[doc(hidden)]
118pub fn panic_message_to_r_error(msg: String, call: Option<SEXP>) -> ! {
119    let c_msg = std::ffi::CString::new(msg)
120        .unwrap_or_else(|_| std::ffi::CString::new("Rust panic (invalid message)").unwrap());
121    unsafe {
122        match call {
123            Some(call) => ffi::Rf_errorcall_unchecked(call, c"%s".as_ptr(), c_msg.as_ptr()),
124            None => ffi::Rf_error_unchecked(c"%s".as_ptr(), c_msg.as_ptr()),
125        }
126    }
127}
128
129/// Run a closure on the worker thread with proper cleanup on panic.
130///
131/// Returns `Ok(T)` on success, `Err(String)` if the closure panicked.
132/// The caller handles the error (either tagged error value or `Rf_errorcall`).
133///
134/// Without the `worker-thread` feature, runs inline on the current thread.
135#[doc(hidden)]
136pub fn run_on_worker<F, T>(f: F) -> Result<T, String>
137where
138    F: FnOnce() -> T + Send + 'static,
139    T: Send + 'static,
140{
141    #[cfg(not(feature = "worker-thread"))]
142    {
143        Ok(f())
144    }
145
146    #[cfg(feature = "worker-thread")]
147    {
148        let result = worker_channel::dispatch_to_worker(f);
149        if let Err(ref msg) = result {
150            crate::panic_telemetry::fire(msg, crate::panic_telemetry::PanicSource::Worker);
151        }
152        result
153    }
154}
155
156/// Initialize the miniextendr runtime.
157///
158/// Records the main thread ID and (with `worker-thread`) spawns the worker.
159/// Must be called from R's main thread, typically from `R_init_<pkgname>`.
160#[doc(hidden)]
161#[unsafe(no_mangle)]
162pub extern "C-unwind" fn miniextendr_runtime_init() {
163    static RUN_ONCE: std::sync::Once = std::sync::Once::new();
164
165    #[cfg(feature = "worker-thread")]
166    {
167        RUN_ONCE.call_once_force(|x| {
168            if x.is_poisoned() {
169                eprintln!(
170                    "warning: miniextendr worker init is retrying after a previous failed attempt"
171                );
172            }
173
174            let current_id = std::thread::current().id();
175            if let Some(&existing_id) = R_MAIN_THREAD_ID.get() {
176                if existing_id != current_id {
177                    panic!(
178                        "miniextendr_runtime_init called from thread {:?}, but R_MAIN_THREAD_ID \
179                         was already set to {:?}. This indicates incorrect initialization order.",
180                        current_id, existing_id
181                    );
182                }
183            } else {
184                let _ = R_MAIN_THREAD_ID.set(current_id);
185            }
186
187            worker_channel::init_worker();
188        });
189    }
190
191    #[cfg(not(feature = "worker-thread"))]
192    {
193        RUN_ONCE.call_once(|| {
194            let _ = R_MAIN_THREAD_ID.set(std::thread::current().id());
195        });
196    }
197}
198// endregion
199
200// region: pub(crate) internals
201
202/// Check whether the current thread has a worker routing context.
203pub(crate) fn has_worker_context() -> bool {
204    #[cfg(feature = "worker-thread")]
205    {
206        worker_channel::has_context()
207    }
208    #[cfg(not(feature = "worker-thread"))]
209    {
210        false
211    }
212}
213
214/// Panic if the runtime hasn't been initialized.
215fn assert_runtime_initialized() {
216    if R_MAIN_THREAD_ID.get().is_none() {
217        panic!(
218            "miniextendr_runtime_init() must be called before using R FFI APIs.\n\
219             \n\
220             This is typically done in R_init_<pkgname>() via:\n\
221             \n\
222             void R_init_pkgname(DllInfo *dll) {{\n\
223             miniextendr_runtime_init();\n\
224             R_registerRoutines(dll, NULL, CallEntries, NULL, NULL);\n\
225             }}\n\
226             \n\
227             If you're embedding R in Rust, call miniextendr_runtime_init() from the main thread \
228             before any R API calls."
229        );
230    }
231}
232// endregion
233
234// region: Worker channel infrastructure (only with worker-thread feature)
235
236#[cfg(feature = "worker-thread")]
237mod worker_channel {
238    use std::any::Any;
239    use std::cell::RefCell;
240    use std::panic::{AssertUnwindSafe, catch_unwind};
241    use std::sync::mpsc::{self, Receiver, SyncSender};
242    use std::thread;
243
244    use super::Sendable;
245    use crate::ffi::{self, Rboolean, SEXP};
246
247    type AnyJob = Box<dyn FnOnce() + Send>;
248
249    static JOB_TX: std::sync::OnceLock<SyncSender<AnyJob>> = std::sync::OnceLock::new();
250
251    // Type-erased main thread work: closure that returns boxed result
252    type MainThreadWork = Sendable<Box<dyn FnOnce() -> Box<dyn Any + Send> + 'static>>;
253
254    // Response from main thread: Ok(result) or Err(panic_message)
255    type MainThreadResponse = Result<Box<dyn Any + Send>, String>;
256
257    /// Messages from worker to main thread
258    enum WorkerMessage<T> {
259        /// Worker requests main thread to execute some work, then send response back
260        WorkRequest(MainThreadWork),
261        /// Worker is done, here's the final result
262        Done(Result<T, String>),
263    }
264
265    type TypeErasedWorkerMessage = WorkerMessage<Box<dyn Any + Send>>;
266    type WorkerToMainSender = RefCell<Option<SyncSender<TypeErasedWorkerMessage>>>;
267    type MainResponseReceiver = RefCell<Option<Receiver<MainThreadResponse>>>;
268
269    // Thread-local channels for worker -> main communication during run_on_worker
270    thread_local! {
271        static WORKER_TO_MAIN_TX: WorkerToMainSender = const { RefCell::new(None) };
272        static MAIN_RESPONSE_RX: MainResponseReceiver = const { RefCell::new(None) };
273    }
274
275    pub(super) fn has_context() -> bool {
276        WORKER_TO_MAIN_TX.with(|tx_cell| tx_cell.borrow().is_some())
277    }
278
279    /// Route a closure from the worker thread to the main thread.
280    pub(super) fn route_to_main_thread<F, R>(f: F) -> R
281    where
282        F: FnOnce() -> R + 'static,
283        R: Send + 'static,
284    {
285        WORKER_TO_MAIN_TX.with(|tx_cell| {
286            let tx = tx_cell
287                .borrow()
288                .as_ref()
289                .expect("`with_r_thread` called outside of `run_on_worker` context")
290                .clone();
291
292            let work: MainThreadWork =
293                Sendable(Box::new(move || Box::new(f()) as Box<dyn Any + Send>));
294
295            tx.send(WorkerMessage::WorkRequest(work))
296                .expect("main thread channel closed");
297        });
298
299        MAIN_RESPONSE_RX.with(|rx_cell| {
300            let rx = rx_cell.borrow();
301            let rx = rx.as_ref().expect("response channel not set");
302            let response = rx.recv().expect("main thread response channel closed");
303            match response {
304                Ok(boxed) => *boxed
305                    .downcast::<R>()
306                    .expect("type mismatch in `with_r_thread` response"),
307                Err(panic_msg) => panic!("panic in `with_r_thread`: {}", panic_msg),
308            }
309        })
310    }
311
312    /// Dispatch a closure to the worker thread.
313    /// Returns Ok(T) or Err(panic_message).
314    pub(super) fn dispatch_to_worker<F, T>(f: F) -> Result<T, String>
315    where
316        F: FnOnce() -> T + Send + 'static,
317        T: Send + 'static,
318    {
319        /// Marker type for R errors caught by R_UnwindProtect's cleanup handler.
320        struct RErrorMarker;
321
322        // Re-entry guard: if we're already on the worker thread (inside a
323        // run_on_worker job), a nested run_on_worker would deadlock because the
324        // single worker thread can't pick up a new job while running the current one.
325        if has_context() {
326            panic!(
327                "run_on_worker called re-entrantly from within a worker context.\n\
328                 \n\
329                 The single worker thread is already executing a job, so a nested \
330                 run_on_worker would deadlock. To call R APIs from worker code, \
331                 use with_r_thread() instead."
332            );
333        }
334
335        let job_tx = JOB_TX
336            .get()
337            .expect("worker not initialized - call miniextendr_runtime_init first");
338
339        // Single channel for worker -> main (work requests + final result).
340        // Capacity 1: each run_on_worker sends exactly one request at a time and blocks
341        // for a response, so no accumulation is possible. The extra slot ensures the
342        // worker's final Done message doesn't block if the main thread longjmped away.
343        let (worker_tx, worker_rx) = mpsc::sync_channel::<TypeErasedWorkerMessage>(1);
344
345        // Channel for main -> worker responses to work requests.
346        // Capacity 1: the worker blocks on recv after each with_r_thread call, so at most
347        // one response is in flight. The extra slot lets the cleanup handler send an error
348        // without blocking (it runs mid-longjmp and cannot wait).
349        let (response_tx, response_rx) = mpsc::sync_channel::<MainThreadResponse>(1);
350
351        let job: AnyJob = Box::new(move || {
352            // Set up thread-local channels for with_r_thread
353            WORKER_TO_MAIN_TX.with(|tx_cell| {
354                *tx_cell.borrow_mut() = Some(worker_tx.clone());
355            });
356            MAIN_RESPONSE_RX.with(|rx_cell| {
357                *rx_cell.borrow_mut() = Some(response_rx);
358            });
359
360            let result = catch_unwind(AssertUnwindSafe(f));
361
362            // Clear thread-locals
363            WORKER_TO_MAIN_TX.with(|tx_cell| {
364                *tx_cell.borrow_mut() = None;
365            });
366            MAIN_RESPONSE_RX.with(|rx_cell| {
367                *rx_cell.borrow_mut() = None;
368            });
369
370            // Send final result back to the main thread's recv loop. The capacity-1
371            // buffer ensures this doesn't block even if the main thread already exited
372            // the loop (e.g., after an R longjmp consumed the last WorkRequest).
373            let to_send: Result<Box<dyn Any + Send>, String> = match result {
374                Ok(val) => Ok(Box::new(val)),
375                Err(payload) => Err(crate::unwind_protect::panic_payload_to_string(&*payload)),
376            };
377            let _ = worker_tx.send(WorkerMessage::Done(to_send));
378        });
379
380        job_tx.send(job).expect("worker thread dead");
381
382        // Main thread event loop: processes WorkRequest messages (from with_r_thread)
383        // until a Done message arrives. Invariant: each WorkRequest produces exactly
384        // one response_tx.send, and the worker blocks until it receives that response.
385        loop {
386            match worker_rx
387                .recv()
388                .expect("worker channel closed unexpectedly")
389            {
390                WorkerMessage::WorkRequest(work) => {
391                    // Execute work on main thread with R_UnwindProtect so we can:
392                    // 1. Catch Rust panics and send them as errors to the worker
393                    // 2. Catch R errors (longjmp) via cleanup handler and send error to worker
394                    //    before R continues unwinding (function never returns in that case)
395
396                    struct CallData {
397                        work: Option<MainThreadWork>,
398                        result: Option<Box<dyn Any + Send>>,
399                        panic_payload: Option<Box<dyn Any + Send>>,
400                        response_tx_ptr: *const SyncSender<MainThreadResponse>,
401                    }
402
403                    unsafe extern "C-unwind" fn trampoline(data: *mut std::ffi::c_void) -> SEXP {
404                        assert!(!data.is_null(), "trampoline: data pointer is null");
405                        let data = unsafe { &mut *data.cast::<CallData>() };
406                        let work = data
407                            .work
408                            .take()
409                            .expect("trampoline: work already consumed")
410                            .0;
411
412                        match catch_unwind(AssertUnwindSafe(work)) {
413                            Ok(result) => {
414                                data.result = Some(result);
415                                SEXP::nil()
416                            }
417                            Err(payload) => {
418                                data.panic_payload = Some(payload);
419                                SEXP::nil()
420                            }
421                        }
422                    }
423
424                    unsafe extern "C-unwind" fn cleanup_handler(
425                        data: *mut std::ffi::c_void,
426                        jump: Rboolean,
427                    ) {
428                        if jump != Rboolean::FALSE {
429                            // R is about to longjmp. We MUST send an error response to the worker
430                            // before continuing the unwind—the worker is blocked on response_rx.recv()
431                            // and would deadlock if we don't send something.
432                            assert!(!data.is_null(), "cleanup_handler: data pointer is null");
433                            let data = unsafe { &*data.cast::<CallData>() };
434                            let response_tx = unsafe { &*data.response_tx_ptr };
435
436                            #[cfg(feature = "nonapi")]
437                            let error_msg = unsafe {
438                                let buf = ffi::R_curErrorBuf();
439                                if buf.is_null() {
440                                    "R error occurred".to_string()
441                                } else {
442                                    std::ffi::CStr::from_ptr(buf).to_string_lossy().into_owned()
443                                }
444                            };
445                            #[cfg(not(feature = "nonapi"))]
446                            let error_msg = "R error occurred".to_string();
447
448                            let _ = response_tx.send(Err(error_msg));
449                            std::panic::panic_any(RErrorMarker);
450                        }
451                    }
452
453                    let response: MainThreadResponse = unsafe {
454                        let token = crate::unwind_protect::get_continuation_token();
455
456                        let data = Box::into_raw(Box::new(CallData {
457                            work: Some(work),
458                            result: None,
459                            panic_payload: None,
460                            response_tx_ptr: std::ptr::from_ref(&response_tx),
461                        }));
462
463                        let panic_result = catch_unwind(AssertUnwindSafe(|| {
464                            ffi::R_UnwindProtect_C_unwind(
465                                Some(trampoline),
466                                data.cast(),
467                                Some(cleanup_handler),
468                                data.cast(),
469                                token,
470                            )
471                        }));
472
473                        let mut data = Box::from_raw(data);
474
475                        match panic_result {
476                            Ok(_) => {
477                                // Check if trampoline caught a panic
478                                if let Some(payload) = data.panic_payload.take() {
479                                    Err(crate::unwind_protect::panic_payload_to_string(&*payload))
480                                } else {
481                                    // Normal completion - return the result
482                                    Ok(data
483                                        .result
484                                        .take()
485                                        .expect("result not set after successful completion"))
486                                }
487                            }
488                            Err(payload) => {
489                                // Check if this was an R error (cleanup handler already sent response)
490                                if payload.downcast_ref::<RErrorMarker>().is_some() {
491                                    drop(data);
492                                    ffi::R_ContinueUnwind(token);
493                                }
494                                // Rust panic - return as error response
495                                Err(crate::unwind_protect::panic_payload_to_string(&*payload))
496                            }
497                        }
498                    };
499
500                    // Exactly one send per WorkRequest: either here (normal/panic) or
501                    // in cleanup_handler (R error). Never both—R error path diverges
502                    // via R_ContinueUnwind above and never reaches this line.
503                    response_tx
504                        .send(response)
505                        .expect("worker response channel closed");
506                }
507                WorkerMessage::Done(result) => {
508                    return match result {
509                        Ok(boxed) => Ok(*boxed
510                            .downcast::<T>()
511                            .expect("type mismatch in run_on_worker result")),
512                        Err(msg) => Err(msg),
513                    };
514                }
515            }
516        }
517    }
518
519    /// Spawn the worker thread and set up the job channel.
520    pub(super) fn init_worker() {
521        if JOB_TX.get().is_some() {
522            return; // Worker already running
523        }
524        // Capacity 0 (rendezvous): the main thread blocks until the worker picks
525        // up the job, ensuring at most one job is in flight at a time.
526        let (job_tx, job_rx) = mpsc::sync_channel::<AnyJob>(0);
527        thread::Builder::new()
528            .name("miniextendr-worker".into())
529            .spawn(move || {
530                while let Ok(job) = job_rx.recv() {
531                    job();
532                }
533            })
534            .expect("failed to spawn worker thread");
535
536        JOB_TX.set(job_tx).expect("worker already initialized");
537    }
538}
539// endregion
540
541// region: Tests
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546
547    #[test]
548    fn sendable_is_send() {
549        fn assert_send<T: Send>() {}
550        assert_send::<Sendable<*const u8>>();
551    }
552
553    #[test]
554    fn with_r_thread_panics_before_init() {
555        // If another test already called miniextendr_runtime_init (via Once),
556        // we can't test the pre-init path. Verify at least panics from wrong thread.
557        if R_MAIN_THREAD_ID.get().is_some() {
558            let handle = std::thread::spawn(|| std::panic::catch_unwind(|| with_r_thread(|| 42)));
559            let result = handle.join().expect("thread panicked outside catch_unwind");
560            assert!(
561                result.is_err(),
562                "with_r_thread should panic from non-main thread"
563            );
564            return;
565        }
566        let result = std::panic::catch_unwind(|| {
567            with_r_thread(|| 42);
568        });
569        assert!(result.is_err());
570        let payload = result.unwrap_err();
571        let msg = crate::unwind_protect::panic_payload_to_string(payload.as_ref());
572        assert!(
573            msg.contains("miniextendr_runtime_init"),
574            "expected init error message, got: {msg}"
575        );
576    }
577
578    #[test]
579    fn has_worker_context_false_outside_worker() {
580        assert!(!has_worker_context());
581    }
582
583    // region: Feature-gated tests: worker-thread
584
585    #[cfg(feature = "worker-thread")]
586    mod worker_tests {
587        use super::*;
588
589        /// Calling `run_on_worker` from within worker code (re-entry) must be
590        /// detected and panic, not deadlock.
591        #[test]
592        fn run_on_worker_reentry_panics_not_deadlocks() {
593            miniextendr_runtime_init();
594
595            let (tx, rx) = std::sync::mpsc::sync_channel::<Result<String, String>>(1);
596
597            std::thread::spawn(move || {
598                let result = run_on_worker(|| {
599                    // Re-entry: this is on the worker thread already.
600                    run_on_worker(|| 42).unwrap();
601                });
602                match result {
603                    Err(msg) => {
604                        let _ = tx.send(Ok(msg));
605                    }
606                    Ok(()) => {
607                        let _ = tx.send(Err("re-entry was not detected".into()));
608                    }
609                }
610            });
611
612            match rx.recv_timeout(std::time::Duration::from_secs(5)) {
613                Ok(Ok(msg)) => {
614                    assert!(
615                        msg.contains("re-entr") || msg.contains("Re-entr"),
616                        "expected re-entry error, got: {msg}"
617                    );
618                }
619                Ok(Err(msg)) => panic!("{msg}"),
620                Err(_) => {
621                    panic!("DEADLOCK: run_on_worker re-entry caused the test to hang for 5 seconds")
622                }
623            }
624        }
625    }
626    // endregion
627
628    // region: Feature-gated tests: no worker-thread (stubs)
629
630    #[cfg(not(feature = "worker-thread"))]
631    mod stub_tests {
632        use super::*;
633
634        #[test]
635        fn stub_with_r_thread_inline() {
636            miniextendr_runtime_init();
637            // If another parallel test already set R_MAIN_THREAD_ID to a
638            // different thread (OnceLock), we won't be "main" and with_r_thread
639            // will rightfully panic. Skip in that case.
640            if !is_r_main_thread() {
641                return;
642            }
643            let result = with_r_thread(|| 42);
644            assert_eq!(result, 42);
645        }
646
647        #[test]
648        fn stub_run_on_worker_inline() {
649            let result = run_on_worker(|| 123);
650            assert_eq!(result, Ok(123));
651        }
652
653        /// Without `worker-thread`, `with_r_thread` must panic when called from
654        /// a non-main thread.
655        #[test]
656        fn stub_with_r_thread_panics_on_wrong_thread() {
657            miniextendr_runtime_init();
658
659            let handle = std::thread::spawn(|| {
660                std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| with_r_thread(|| 42)))
661            });
662
663            let result = handle.join().expect("thread panicked outside catch_unwind");
664            assert!(
665                result.is_err(),
666                "with_r_thread should panic when called from a non-main thread \
667                 without the worker-thread feature, but it ran inline silently"
668            );
669        }
670    }
671    // endregion
672}
673// endregion