miniextendr_api/
worker.rs1use std::sync::OnceLock;
25use std::thread;
26
27use crate::ffi::{self, SEXP};
28
29static R_MAIN_THREAD_ID: OnceLock<thread::ThreadId> = OnceLock::new();
30
31#[doc(hidden)]
38#[repr(transparent)]
39#[derive(Clone, Copy)]
40pub struct Sendable<T>(pub T);
41
42unsafe impl<T> Send for Sendable<T> {}
43
44#[inline(always)]
49pub fn is_r_main_thread() -> bool {
50 R_MAIN_THREAD_ID
51 .get()
52 .map(|&id| id == std::thread::current().id())
53 .unwrap_or(false)
54}
55
56pub fn with_r_thread<F, R>(f: F) -> R
82where
83 F: FnOnce() -> R + 'static,
84 R: Send + 'static,
85{
86 assert_runtime_initialized();
87
88 if is_r_main_thread() {
89 return f();
90 }
91
92 #[cfg(not(feature = "worker-thread"))]
94 {
95 panic!(
96 "with_r_thread called from a non-main thread without the `worker-thread` feature.\n\
97 \n\
98 Without `worker-thread`, R API calls can only happen on the R main thread.\n\
99 Either:\n\
100 - Enable the `worker-thread` cargo feature to route calls from background threads, or\n\
101 - Ensure this code only runs on the R main thread."
102 );
103 }
104
105 #[cfg(feature = "worker-thread")]
106 {
107 worker_channel::route_to_main_thread(f)
108 }
109}
110#[doc(hidden)]
118pub fn panic_message_to_r_error(msg: String, call: Option<SEXP>) -> ! {
119 let c_msg = std::ffi::CString::new(msg)
120 .unwrap_or_else(|_| std::ffi::CString::new("Rust panic (invalid message)").unwrap());
121 unsafe {
122 match call {
123 Some(call) => ffi::Rf_errorcall_unchecked(call, c"%s".as_ptr(), c_msg.as_ptr()),
124 None => ffi::Rf_error_unchecked(c"%s".as_ptr(), c_msg.as_ptr()),
125 }
126 }
127}
128
129#[doc(hidden)]
136pub fn run_on_worker<F, T>(f: F) -> Result<T, String>
137where
138 F: FnOnce() -> T + Send + 'static,
139 T: Send + 'static,
140{
141 #[cfg(not(feature = "worker-thread"))]
142 {
143 Ok(f())
144 }
145
146 #[cfg(feature = "worker-thread")]
147 {
148 let result = worker_channel::dispatch_to_worker(f);
149 if let Err(ref msg) = result {
150 crate::panic_telemetry::fire(msg, crate::panic_telemetry::PanicSource::Worker);
151 }
152 result
153 }
154}
155
156#[doc(hidden)]
161#[unsafe(no_mangle)]
162pub extern "C-unwind" fn miniextendr_runtime_init() {
163 static RUN_ONCE: std::sync::Once = std::sync::Once::new();
164
165 #[cfg(feature = "worker-thread")]
166 {
167 RUN_ONCE.call_once_force(|x| {
168 if x.is_poisoned() {
169 eprintln!(
170 "warning: miniextendr worker init is retrying after a previous failed attempt"
171 );
172 }
173
174 let current_id = std::thread::current().id();
175 if let Some(&existing_id) = R_MAIN_THREAD_ID.get() {
176 if existing_id != current_id {
177 panic!(
178 "miniextendr_runtime_init called from thread {:?}, but R_MAIN_THREAD_ID \
179 was already set to {:?}. This indicates incorrect initialization order.",
180 current_id, existing_id
181 );
182 }
183 } else {
184 let _ = R_MAIN_THREAD_ID.set(current_id);
185 }
186
187 worker_channel::init_worker();
188 });
189 }
190
191 #[cfg(not(feature = "worker-thread"))]
192 {
193 RUN_ONCE.call_once(|| {
194 let _ = R_MAIN_THREAD_ID.set(std::thread::current().id());
195 });
196 }
197}
198pub(crate) fn has_worker_context() -> bool {
204 #[cfg(feature = "worker-thread")]
205 {
206 worker_channel::has_context()
207 }
208 #[cfg(not(feature = "worker-thread"))]
209 {
210 false
211 }
212}
213
214fn assert_runtime_initialized() {
216 if R_MAIN_THREAD_ID.get().is_none() {
217 panic!(
218 "miniextendr_runtime_init() must be called before using R FFI APIs.\n\
219 \n\
220 This is typically done in R_init_<pkgname>() via:\n\
221 \n\
222 void R_init_pkgname(DllInfo *dll) {{\n\
223 miniextendr_runtime_init();\n\
224 R_registerRoutines(dll, NULL, CallEntries, NULL, NULL);\n\
225 }}\n\
226 \n\
227 If you're embedding R in Rust, call miniextendr_runtime_init() from the main thread \
228 before any R API calls."
229 );
230 }
231}
232#[cfg(feature = "worker-thread")]
237mod worker_channel {
238 use std::any::Any;
239 use std::cell::RefCell;
240 use std::panic::{AssertUnwindSafe, catch_unwind};
241 use std::sync::mpsc::{self, Receiver, SyncSender};
242 use std::thread;
243
244 use super::Sendable;
245 use crate::ffi::{self, Rboolean, SEXP};
246
247 type AnyJob = Box<dyn FnOnce() + Send>;
248
249 static JOB_TX: std::sync::OnceLock<SyncSender<AnyJob>> = std::sync::OnceLock::new();
250
251 type MainThreadWork = Sendable<Box<dyn FnOnce() -> Box<dyn Any + Send> + 'static>>;
253
254 type MainThreadResponse = Result<Box<dyn Any + Send>, String>;
256
257 enum WorkerMessage<T> {
259 WorkRequest(MainThreadWork),
261 Done(Result<T, String>),
263 }
264
265 type TypeErasedWorkerMessage = WorkerMessage<Box<dyn Any + Send>>;
266 type WorkerToMainSender = RefCell<Option<SyncSender<TypeErasedWorkerMessage>>>;
267 type MainResponseReceiver = RefCell<Option<Receiver<MainThreadResponse>>>;
268
269 thread_local! {
271 static WORKER_TO_MAIN_TX: WorkerToMainSender = const { RefCell::new(None) };
272 static MAIN_RESPONSE_RX: MainResponseReceiver = const { RefCell::new(None) };
273 }
274
275 pub(super) fn has_context() -> bool {
276 WORKER_TO_MAIN_TX.with(|tx_cell| tx_cell.borrow().is_some())
277 }
278
279 pub(super) fn route_to_main_thread<F, R>(f: F) -> R
281 where
282 F: FnOnce() -> R + 'static,
283 R: Send + 'static,
284 {
285 WORKER_TO_MAIN_TX.with(|tx_cell| {
286 let tx = tx_cell
287 .borrow()
288 .as_ref()
289 .expect("`with_r_thread` called outside of `run_on_worker` context")
290 .clone();
291
292 let work: MainThreadWork =
293 Sendable(Box::new(move || Box::new(f()) as Box<dyn Any + Send>));
294
295 tx.send(WorkerMessage::WorkRequest(work))
296 .expect("main thread channel closed");
297 });
298
299 MAIN_RESPONSE_RX.with(|rx_cell| {
300 let rx = rx_cell.borrow();
301 let rx = rx.as_ref().expect("response channel not set");
302 let response = rx.recv().expect("main thread response channel closed");
303 match response {
304 Ok(boxed) => *boxed
305 .downcast::<R>()
306 .expect("type mismatch in `with_r_thread` response"),
307 Err(panic_msg) => panic!("panic in `with_r_thread`: {}", panic_msg),
308 }
309 })
310 }
311
312 pub(super) fn dispatch_to_worker<F, T>(f: F) -> Result<T, String>
315 where
316 F: FnOnce() -> T + Send + 'static,
317 T: Send + 'static,
318 {
319 struct RErrorMarker;
321
322 if has_context() {
326 panic!(
327 "run_on_worker called re-entrantly from within a worker context.\n\
328 \n\
329 The single worker thread is already executing a job, so a nested \
330 run_on_worker would deadlock. To call R APIs from worker code, \
331 use with_r_thread() instead."
332 );
333 }
334
335 let job_tx = JOB_TX
336 .get()
337 .expect("worker not initialized - call miniextendr_runtime_init first");
338
339 let (worker_tx, worker_rx) = mpsc::sync_channel::<TypeErasedWorkerMessage>(1);
344
345 let (response_tx, response_rx) = mpsc::sync_channel::<MainThreadResponse>(1);
350
351 let job: AnyJob = Box::new(move || {
352 WORKER_TO_MAIN_TX.with(|tx_cell| {
354 *tx_cell.borrow_mut() = Some(worker_tx.clone());
355 });
356 MAIN_RESPONSE_RX.with(|rx_cell| {
357 *rx_cell.borrow_mut() = Some(response_rx);
358 });
359
360 let result = catch_unwind(AssertUnwindSafe(f));
361
362 WORKER_TO_MAIN_TX.with(|tx_cell| {
364 *tx_cell.borrow_mut() = None;
365 });
366 MAIN_RESPONSE_RX.with(|rx_cell| {
367 *rx_cell.borrow_mut() = None;
368 });
369
370 let to_send: Result<Box<dyn Any + Send>, String> = match result {
374 Ok(val) => Ok(Box::new(val)),
375 Err(payload) => Err(crate::unwind_protect::panic_payload_to_string(&*payload)),
376 };
377 let _ = worker_tx.send(WorkerMessage::Done(to_send));
378 });
379
380 job_tx.send(job).expect("worker thread dead");
381
382 loop {
386 match worker_rx
387 .recv()
388 .expect("worker channel closed unexpectedly")
389 {
390 WorkerMessage::WorkRequest(work) => {
391 struct CallData {
397 work: Option<MainThreadWork>,
398 result: Option<Box<dyn Any + Send>>,
399 panic_payload: Option<Box<dyn Any + Send>>,
400 response_tx_ptr: *const SyncSender<MainThreadResponse>,
401 }
402
403 unsafe extern "C-unwind" fn trampoline(data: *mut std::ffi::c_void) -> SEXP {
404 assert!(!data.is_null(), "trampoline: data pointer is null");
405 let data = unsafe { &mut *data.cast::<CallData>() };
406 let work = data
407 .work
408 .take()
409 .expect("trampoline: work already consumed")
410 .0;
411
412 match catch_unwind(AssertUnwindSafe(work)) {
413 Ok(result) => {
414 data.result = Some(result);
415 SEXP::nil()
416 }
417 Err(payload) => {
418 data.panic_payload = Some(payload);
419 SEXP::nil()
420 }
421 }
422 }
423
424 unsafe extern "C-unwind" fn cleanup_handler(
425 data: *mut std::ffi::c_void,
426 jump: Rboolean,
427 ) {
428 if jump != Rboolean::FALSE {
429 assert!(!data.is_null(), "cleanup_handler: data pointer is null");
433 let data = unsafe { &*data.cast::<CallData>() };
434 let response_tx = unsafe { &*data.response_tx_ptr };
435
436 #[cfg(feature = "nonapi")]
437 let error_msg = unsafe {
438 let buf = ffi::R_curErrorBuf();
439 if buf.is_null() {
440 "R error occurred".to_string()
441 } else {
442 std::ffi::CStr::from_ptr(buf).to_string_lossy().into_owned()
443 }
444 };
445 #[cfg(not(feature = "nonapi"))]
446 let error_msg = "R error occurred".to_string();
447
448 let _ = response_tx.send(Err(error_msg));
449 std::panic::panic_any(RErrorMarker);
450 }
451 }
452
453 let response: MainThreadResponse = unsafe {
454 let token = crate::unwind_protect::get_continuation_token();
455
456 let data = Box::into_raw(Box::new(CallData {
457 work: Some(work),
458 result: None,
459 panic_payload: None,
460 response_tx_ptr: std::ptr::from_ref(&response_tx),
461 }));
462
463 let panic_result = catch_unwind(AssertUnwindSafe(|| {
464 ffi::R_UnwindProtect_C_unwind(
465 Some(trampoline),
466 data.cast(),
467 Some(cleanup_handler),
468 data.cast(),
469 token,
470 )
471 }));
472
473 let mut data = Box::from_raw(data);
474
475 match panic_result {
476 Ok(_) => {
477 if let Some(payload) = data.panic_payload.take() {
479 Err(crate::unwind_protect::panic_payload_to_string(&*payload))
480 } else {
481 Ok(data
483 .result
484 .take()
485 .expect("result not set after successful completion"))
486 }
487 }
488 Err(payload) => {
489 if payload.downcast_ref::<RErrorMarker>().is_some() {
491 drop(data);
492 ffi::R_ContinueUnwind(token);
493 }
494 Err(crate::unwind_protect::panic_payload_to_string(&*payload))
496 }
497 }
498 };
499
500 response_tx
504 .send(response)
505 .expect("worker response channel closed");
506 }
507 WorkerMessage::Done(result) => {
508 return match result {
509 Ok(boxed) => Ok(*boxed
510 .downcast::<T>()
511 .expect("type mismatch in run_on_worker result")),
512 Err(msg) => Err(msg),
513 };
514 }
515 }
516 }
517 }
518
519 pub(super) fn init_worker() {
521 if JOB_TX.get().is_some() {
522 return; }
524 let (job_tx, job_rx) = mpsc::sync_channel::<AnyJob>(0);
527 thread::Builder::new()
528 .name("miniextendr-worker".into())
529 .spawn(move || {
530 while let Ok(job) = job_rx.recv() {
531 job();
532 }
533 })
534 .expect("failed to spawn worker thread");
535
536 JOB_TX.set(job_tx).expect("worker already initialized");
537 }
538}
539#[cfg(test)]
544mod tests {
545 use super::*;
546
547 #[test]
548 fn sendable_is_send() {
549 fn assert_send<T: Send>() {}
550 assert_send::<Sendable<*const u8>>();
551 }
552
553 #[test]
554 fn with_r_thread_panics_before_init() {
555 if R_MAIN_THREAD_ID.get().is_some() {
558 let handle = std::thread::spawn(|| std::panic::catch_unwind(|| with_r_thread(|| 42)));
559 let result = handle.join().expect("thread panicked outside catch_unwind");
560 assert!(
561 result.is_err(),
562 "with_r_thread should panic from non-main thread"
563 );
564 return;
565 }
566 let result = std::panic::catch_unwind(|| {
567 with_r_thread(|| 42);
568 });
569 assert!(result.is_err());
570 let payload = result.unwrap_err();
571 let msg = crate::unwind_protect::panic_payload_to_string(payload.as_ref());
572 assert!(
573 msg.contains("miniextendr_runtime_init"),
574 "expected init error message, got: {msg}"
575 );
576 }
577
578 #[test]
579 fn has_worker_context_false_outside_worker() {
580 assert!(!has_worker_context());
581 }
582
583 #[cfg(feature = "worker-thread")]
586 mod worker_tests {
587 use super::*;
588
589 #[test]
592 fn run_on_worker_reentry_panics_not_deadlocks() {
593 miniextendr_runtime_init();
594
595 let (tx, rx) = std::sync::mpsc::sync_channel::<Result<String, String>>(1);
596
597 std::thread::spawn(move || {
598 let result = run_on_worker(|| {
599 run_on_worker(|| 42).unwrap();
601 });
602 match result {
603 Err(msg) => {
604 let _ = tx.send(Ok(msg));
605 }
606 Ok(()) => {
607 let _ = tx.send(Err("re-entry was not detected".into()));
608 }
609 }
610 });
611
612 match rx.recv_timeout(std::time::Duration::from_secs(5)) {
613 Ok(Ok(msg)) => {
614 assert!(
615 msg.contains("re-entr") || msg.contains("Re-entr"),
616 "expected re-entry error, got: {msg}"
617 );
618 }
619 Ok(Err(msg)) => panic!("{msg}"),
620 Err(_) => {
621 panic!("DEADLOCK: run_on_worker re-entry caused the test to hang for 5 seconds")
622 }
623 }
624 }
625 }
626 #[cfg(not(feature = "worker-thread"))]
631 mod stub_tests {
632 use super::*;
633
634 #[test]
635 fn stub_with_r_thread_inline() {
636 miniextendr_runtime_init();
637 if !is_r_main_thread() {
641 return;
642 }
643 let result = with_r_thread(|| 42);
644 assert_eq!(result, 42);
645 }
646
647 #[test]
648 fn stub_run_on_worker_inline() {
649 let result = run_on_worker(|| 123);
650 assert_eq!(result, Ok(123));
651 }
652
653 #[test]
656 fn stub_with_r_thread_panics_on_wrong_thread() {
657 miniextendr_runtime_init();
658
659 let handle = std::thread::spawn(|| {
660 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| with_r_thread(|| 42)))
661 });
662
663 let result = handle.join().expect("thread panicked outside catch_unwind");
664 assert!(
665 result.is_err(),
666 "with_r_thread should panic when called from a non-main thread \
667 without the worker-thread feature, but it ran inline silently"
668 );
669 }
670 }
671 }
673