00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 #undef _FORTIFY_SOURCE
00055 #undef __USE_FORTIFY_LEVEL
00056 #define __USE_FORTIFY_LEVEL 0
00057
00058
00059
00060 #include "eval_intern.h"
00061 #include "gc.h"
00062 #include "timev.h"
00063 #include "ruby/io.h"
00064 #include "ruby/thread.h"
00065 #include "internal.h"
00066
00067 #ifndef USE_NATIVE_THREAD_PRIORITY
00068 #define USE_NATIVE_THREAD_PRIORITY 0
00069 #define RUBY_THREAD_PRIORITY_MAX 3
00070 #define RUBY_THREAD_PRIORITY_MIN -3
00071 #endif
00072
00073 #ifndef THREAD_DEBUG
00074 #define THREAD_DEBUG 0
00075 #endif
00076
00077 VALUE rb_cMutex;
00078 VALUE rb_cThreadShield;
00079
00080 static VALUE sym_immediate;
00081 static VALUE sym_on_blocking;
00082 static VALUE sym_never;
00083 static ID id_locals;
00084
00085 static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check);
00086 static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check);
00087 static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check);
00088 static double timeofday(void);
00089 static int rb_threadptr_dead(rb_thread_t *th);
00090 static void rb_check_deadlock(rb_vm_t *vm);
00091 static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th);
00092
00093 #define eKillSignal INT2FIX(0)
00094 #define eTerminateSignal INT2FIX(1)
00095 static volatile int system_working = 1;
00096
00097 #define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream]
00098
00099 inline static void
00100 st_delete_wrap(st_table *table, st_data_t key)
00101 {
00102 st_delete(table, &key, 0);
00103 }
00104
00105
00106
00107 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
00108
00109 struct rb_blocking_region_buffer {
00110 enum rb_thread_status prev_status;
00111 struct rb_unblock_callback oldubf;
00112 };
00113
00114 static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
00115 struct rb_unblock_callback *old, int fail_if_interrupted);
00116 static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old);
00117
00118 static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
00119 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
00120 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
00121
00122 #ifdef __ia64
00123 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) \
00124 do{(th)->machine.register_stack_end = rb_ia64_bsp();}while(0)
00125 #else
00126 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th)
00127 #endif
00128 #define RB_GC_SAVE_MACHINE_CONTEXT(th) \
00129 do { \
00130 FLUSH_REGISTER_WINDOWS; \
00131 RB_GC_SAVE_MACHINE_REGISTER_STACK(th); \
00132 setjmp((th)->machine.regs); \
00133 SET_MACHINE_STACK_END(&(th)->machine.stack_end); \
00134 } while (0)
00135
00136 #define GVL_UNLOCK_BEGIN() do { \
00137 rb_thread_t *_th_stored = GET_THREAD(); \
00138 RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \
00139 gvl_release(_th_stored->vm);
00140
00141 #define GVL_UNLOCK_END() \
00142 gvl_acquire(_th_stored->vm, _th_stored); \
00143 rb_thread_set_current(_th_stored); \
00144 } while(0)
00145
00146 #ifdef __GNUC__
00147 #define only_if_constant(expr, notconst) (__builtin_constant_p(expr) ? (expr) : (notconst))
00148 #else
00149 #define only_if_constant(expr, notconst) notconst
00150 #endif
00151 #define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \
00152 rb_thread_t *__th = GET_THREAD(); \
00153 struct rb_blocking_region_buffer __region; \
00154 if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
00155 \
00156 !only_if_constant(fail_if_interrupted, TRUE)) { \
00157 exec; \
00158 blocking_region_end(__th, &__region); \
00159 }; \
00160 } while(0)
00161
00162 #if THREAD_DEBUG
00163 #ifdef HAVE_VA_ARGS_MACRO
00164 void rb_thread_debug(const char *file, int line, const char *fmt, ...);
00165 #define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__)
00166 #define POSITION_FORMAT "%s:%d:"
00167 #define POSITION_ARGS ,file, line
00168 #else
00169 void rb_thread_debug(const char *fmt, ...);
00170 #define thread_debug rb_thread_debug
00171 #define POSITION_FORMAT
00172 #define POSITION_ARGS
00173 #endif
00174
00175 # if THREAD_DEBUG < 0
00176 static int rb_thread_debug_enabled;
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186 static VALUE
00187 rb_thread_s_debug(void)
00188 {
00189 return INT2NUM(rb_thread_debug_enabled);
00190 }
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200 static VALUE
00201 rb_thread_s_debug_set(VALUE self, VALUE val)
00202 {
00203 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0;
00204 return val;
00205 }
00206 # else
00207 # define rb_thread_debug_enabled THREAD_DEBUG
00208 # endif
00209 #else
00210 #define thread_debug if(0)printf
00211 #endif
00212
00213 #ifndef __ia64
00214 #define thread_start_func_2(th, st, rst) thread_start_func_2(th, st)
00215 #endif
00216 NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
00217 VALUE *register_stack_start));
00218 static void timer_thread_function(void *);
00219
00220 #if defined(_WIN32)
00221 #include "thread_win32.c"
00222
00223 #define DEBUG_OUT() \
00224 WaitForSingleObject(&debug_mutex, INFINITE); \
00225 printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \
00226 fflush(stdout); \
00227 ReleaseMutex(&debug_mutex);
00228
00229 #elif defined(HAVE_PTHREAD_H)
00230 #include "thread_pthread.c"
00231
00232 #define DEBUG_OUT() \
00233 pthread_mutex_lock(&debug_mutex); \
00234 printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \
00235 fflush(stdout); \
00236 pthread_mutex_unlock(&debug_mutex);
00237
00238 #else
00239 #error "unsupported thread type"
00240 #endif
00241
00242 #if THREAD_DEBUG
00243 static int debug_mutex_initialized = 1;
00244 static rb_nativethread_lock_t debug_mutex;
00245
00246 void
00247 rb_thread_debug(
00248 #ifdef HAVE_VA_ARGS_MACRO
00249 const char *file, int line,
00250 #endif
00251 const char *fmt, ...)
00252 {
00253 va_list args;
00254 char buf[BUFSIZ];
00255
00256 if (!rb_thread_debug_enabled) return;
00257
00258 if (debug_mutex_initialized == 1) {
00259 debug_mutex_initialized = 0;
00260 native_mutex_initialize(&debug_mutex);
00261 }
00262
00263 va_start(args, fmt);
00264 vsnprintf(buf, BUFSIZ, fmt, args);
00265 va_end(args);
00266
00267 DEBUG_OUT();
00268 }
00269 #endif
00270
00271 void
00272 rb_vm_gvl_destroy(rb_vm_t *vm)
00273 {
00274 gvl_release(vm);
00275 gvl_destroy(vm);
00276 native_mutex_destroy(&vm->thread_destruct_lock);
00277 }
00278
00279 void
00280 rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock)
00281 {
00282 native_mutex_initialize(lock);
00283 }
00284
00285 void
00286 rb_nativethread_lock_destroy(rb_nativethread_lock_t *lock)
00287 {
00288 native_mutex_destroy(lock);
00289 }
00290
00291 void
00292 rb_nativethread_lock_lock(rb_nativethread_lock_t *lock)
00293 {
00294 native_mutex_lock(lock);
00295 }
00296
00297 void
00298 rb_nativethread_lock_unlock(rb_nativethread_lock_t *lock)
00299 {
00300 native_mutex_unlock(lock);
00301 }
00302
00303 static int
00304 set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
00305 struct rb_unblock_callback *old, int fail_if_interrupted)
00306 {
00307 check_ints:
00308 if (fail_if_interrupted) {
00309 if (RUBY_VM_INTERRUPTED_ANY(th)) {
00310 return FALSE;
00311 }
00312 }
00313 else {
00314 RUBY_VM_CHECK_INTS(th);
00315 }
00316
00317 native_mutex_lock(&th->interrupt_lock);
00318 if (RUBY_VM_INTERRUPTED_ANY(th)) {
00319 native_mutex_unlock(&th->interrupt_lock);
00320 goto check_ints;
00321 }
00322 else {
00323 if (old) *old = th->unblock;
00324 th->unblock.func = func;
00325 th->unblock.arg = arg;
00326 }
00327 native_mutex_unlock(&th->interrupt_lock);
00328
00329 return TRUE;
00330 }
00331
00332 static void
00333 reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old)
00334 {
00335 native_mutex_lock(&th->interrupt_lock);
00336 th->unblock = *old;
00337 native_mutex_unlock(&th->interrupt_lock);
00338 }
00339
00340 static void
00341 rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
00342 {
00343 native_mutex_lock(&th->interrupt_lock);
00344 if (trap)
00345 RUBY_VM_SET_TRAP_INTERRUPT(th);
00346 else
00347 RUBY_VM_SET_INTERRUPT(th);
00348 if (th->unblock.func) {
00349 (th->unblock.func)(th->unblock.arg);
00350 }
00351 else {
00352
00353 }
00354 native_cond_signal(&th->interrupt_cond);
00355 native_mutex_unlock(&th->interrupt_lock);
00356 }
00357
00358 void
00359 rb_threadptr_interrupt(rb_thread_t *th)
00360 {
00361 rb_threadptr_interrupt_common(th, 0);
00362 }
00363
00364 void
00365 rb_threadptr_trap_interrupt(rb_thread_t *th)
00366 {
00367 rb_threadptr_interrupt_common(th, 1);
00368 }
00369
00370 static int
00371 terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread)
00372 {
00373 VALUE thval = key;
00374 rb_thread_t *th;
00375 GetThreadPtr(thval, th);
00376
00377 if (th != main_thread) {
00378 thread_debug("terminate_i: %p\n", (void *)th);
00379 rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
00380 rb_threadptr_interrupt(th);
00381 }
00382 else {
00383 thread_debug("terminate_i: main thread (%p)\n", (void *)th);
00384 }
00385 return ST_CONTINUE;
00386 }
00387
00388 typedef struct rb_mutex_struct
00389 {
00390 rb_nativethread_lock_t lock;
00391 rb_nativethread_cond_t cond;
00392 struct rb_thread_struct volatile *th;
00393 struct rb_mutex_struct *next_mutex;
00394 int cond_waiting;
00395 int allow_trap;
00396 } rb_mutex_t;
00397
00398 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
00399 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
00400 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
00401 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th);
00402
00403 void
00404 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
00405 {
00406 const char *err;
00407 rb_mutex_t *mutex;
00408 rb_mutex_t *mutexes = th->keeping_mutexes;
00409
00410 while (mutexes) {
00411 mutex = mutexes;
00412
00413
00414 mutexes = mutex->next_mutex;
00415 err = rb_mutex_unlock_th(mutex, th);
00416 if (err) rb_bug("invalid keeping_mutexes: %s", err);
00417 }
00418 }
00419
00420 void
00421 rb_thread_terminate_all(void)
00422 {
00423 rb_thread_t *th = GET_THREAD();
00424 rb_vm_t *vm = th->vm;
00425
00426 if (vm->main_thread != th) {
00427 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
00428 (void *)vm->main_thread, (void *)th);
00429 }
00430
00431
00432 rb_threadptr_unlock_all_locking_mutexes(th);
00433
00434 retry:
00435 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
00436 st_foreach(vm->living_threads, terminate_i, (st_data_t)th);
00437
00438 while (!rb_thread_alone()) {
00439 int state;
00440
00441 TH_PUSH_TAG(th);
00442 if ((state = TH_EXEC_TAG()) == 0) {
00443
00444
00445
00446
00447 native_sleep(th, 0);
00448 RUBY_VM_CHECK_INTS_BLOCKING(th);
00449 }
00450 TH_POP_TAG();
00451
00452
00453
00454
00455
00456
00457 if (state) {
00458 goto retry;
00459 }
00460 }
00461 }
00462
00463 static void
00464 thread_cleanup_func_before_exec(void *th_ptr)
00465 {
00466 rb_thread_t *th = th_ptr;
00467 th->status = THREAD_KILLED;
00468 th->machine.stack_start = th->machine.stack_end = 0;
00469 #ifdef __ia64
00470 th->machine.register_stack_start = th->machine.register_stack_end = 0;
00471 #endif
00472 }
00473
00474 static void
00475 thread_cleanup_func(void *th_ptr, int atfork)
00476 {
00477 rb_thread_t *th = th_ptr;
00478
00479 th->locking_mutex = Qfalse;
00480 thread_cleanup_func_before_exec(th_ptr);
00481
00482
00483
00484
00485
00486
00487 if (atfork)
00488 return;
00489
00490 native_mutex_destroy(&th->interrupt_lock);
00491 native_thread_destroy(th);
00492 }
00493
00494 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
00495
00496 void
00497 ruby_thread_init_stack(rb_thread_t *th)
00498 {
00499 native_thread_init_stack(th);
00500 }
00501
00502 static int
00503 thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)
00504 {
00505 int state;
00506 VALUE args = th->first_args;
00507 rb_proc_t *proc;
00508 rb_thread_list_t *join_list;
00509 rb_thread_t *main_th;
00510 VALUE errinfo = Qnil;
00511 # ifdef USE_SIGALTSTACK
00512 void rb_register_sigaltstack(rb_thread_t *th);
00513
00514 rb_register_sigaltstack(th);
00515 # endif
00516
00517 if (th == th->vm->main_thread)
00518 rb_bug("thread_start_func_2 must not be used for main thread");
00519
00520 ruby_thread_set_native(th);
00521
00522 th->machine.stack_start = stack_start;
00523 #ifdef __ia64
00524 th->machine.register_stack_start = register_stack_start;
00525 #endif
00526 thread_debug("thread start: %p\n", (void *)th);
00527
00528 gvl_acquire(th->vm, th);
00529 {
00530 thread_debug("thread start (get lock): %p\n", (void *)th);
00531 rb_thread_set_current(th);
00532
00533 TH_PUSH_TAG(th);
00534 if ((state = EXEC_TAG()) == 0) {
00535 SAVE_ROOT_JMPBUF(th, {
00536 if (!th->first_func) {
00537 GetProcPtr(th->first_proc, proc);
00538 th->errinfo = Qnil;
00539 th->root_lep = rb_vm_ep_local_ep(proc->block.ep);
00540 th->root_svar = Qnil;
00541 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, Qundef);
00542 th->value = rb_vm_invoke_proc(th, proc, (int)RARRAY_LEN(args), RARRAY_CONST_PTR(args), 0);
00543 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_END, th->self, 0, 0, Qundef);
00544 }
00545 else {
00546 th->value = (*th->first_func)((void *)args);
00547 }
00548 });
00549 }
00550 else {
00551 errinfo = th->errinfo;
00552 if (state == TAG_FATAL) {
00553
00554 }
00555 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
00556
00557 }
00558 else if (th->vm->thread_abort_on_exception ||
00559 th->abort_on_exception || RTEST(ruby_debug)) {
00560
00561 }
00562 else {
00563 errinfo = Qnil;
00564 }
00565 th->value = Qnil;
00566 }
00567
00568 th->status = THREAD_KILLED;
00569 thread_debug("thread end: %p\n", (void *)th);
00570
00571 main_th = th->vm->main_thread;
00572 if (main_th == th) {
00573 ruby_stop(0);
00574 }
00575 if (RB_TYPE_P(errinfo, T_OBJECT)) {
00576
00577 rb_threadptr_raise(main_th, 1, &errinfo);
00578 }
00579 TH_POP_TAG();
00580
00581
00582 if (th->locking_mutex != Qfalse) {
00583 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
00584 (void *)th, th->locking_mutex);
00585 }
00586
00587
00588 st_delete_wrap(th->vm->living_threads, th->self);
00589 if (rb_thread_alone()) {
00590
00591 rb_threadptr_interrupt(main_th);
00592 }
00593
00594
00595 join_list = th->join_list;
00596 while (join_list) {
00597 rb_threadptr_interrupt(join_list->th);
00598 switch (join_list->th->status) {
00599 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
00600 join_list->th->status = THREAD_RUNNABLE;
00601 default: break;
00602 }
00603 join_list = join_list->next;
00604 }
00605
00606 rb_threadptr_unlock_all_locking_mutexes(th);
00607 rb_check_deadlock(th->vm);
00608
00609 if (!th->root_fiber) {
00610 rb_thread_recycle_stack_release(th->stack);
00611 th->stack = 0;
00612 }
00613 }
00614 native_mutex_lock(&th->vm->thread_destruct_lock);
00615
00616 th->vm->running_thread = NULL;
00617 native_mutex_unlock(&th->vm->thread_destruct_lock);
00618 thread_cleanup_func(th, FALSE);
00619 gvl_release(th->vm);
00620
00621 return 0;
00622 }
00623
00624 static VALUE
00625 thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
00626 {
00627 rb_thread_t *th, *current_th = GET_THREAD();
00628 int err;
00629
00630 if (OBJ_FROZEN(GET_THREAD()->thgroup)) {
00631 rb_raise(rb_eThreadError,
00632 "can't start a new thread (frozen ThreadGroup)");
00633 }
00634 GetThreadPtr(thval, th);
00635
00636
00637 th->first_func = fn;
00638 th->first_proc = fn ? Qfalse : rb_block_proc();
00639 th->first_args = args;
00640
00641 th->priority = current_th->priority;
00642 th->thgroup = current_th->thgroup;
00643
00644 th->pending_interrupt_queue = rb_ary_tmp_new(0);
00645 th->pending_interrupt_queue_checked = 0;
00646 th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
00647 RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack);
00648
00649 th->interrupt_mask = 0;
00650
00651 native_mutex_initialize(&th->interrupt_lock);
00652 native_cond_initialize(&th->interrupt_cond, RB_CONDATTR_CLOCK_MONOTONIC);
00653
00654
00655 err = native_thread_create(th);
00656 if (err) {
00657 th->status = THREAD_KILLED;
00658 rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
00659 }
00660 st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id);
00661 return thval;
00662 }
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684 static VALUE
00685 thread_s_new(int argc, VALUE *argv, VALUE klass)
00686 {
00687 rb_thread_t *th;
00688 VALUE thread = rb_thread_alloc(klass);
00689
00690 if (GET_VM()->main_thread->status == THREAD_KILLED)
00691 rb_raise(rb_eThreadError, "can't alloc thread");
00692
00693 rb_obj_call_init(thread, argc, argv);
00694 GetThreadPtr(thread, th);
00695 if (!th->first_args) {
00696 rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'",
00697 rb_class2name(klass));
00698 }
00699 return thread;
00700 }
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712 static VALUE
00713 thread_start(VALUE klass, VALUE args)
00714 {
00715 return thread_create_core(rb_thread_alloc(klass), args, 0);
00716 }
00717
00718
00719 static VALUE
00720 thread_initialize(VALUE thread, VALUE args)
00721 {
00722 rb_thread_t *th;
00723 if (!rb_block_given_p()) {
00724 rb_raise(rb_eThreadError, "must be called with a block");
00725 }
00726 GetThreadPtr(thread, th);
00727 if (th->first_args) {
00728 VALUE proc = th->first_proc, line, loc;
00729 const char *file;
00730 if (!proc || !RTEST(loc = rb_proc_location(proc))) {
00731 rb_raise(rb_eThreadError, "already initialized thread");
00732 }
00733 file = RSTRING_PTR(RARRAY_AREF(loc, 0));
00734 if (NIL_P(line = RARRAY_AREF(loc, 1))) {
00735 rb_raise(rb_eThreadError, "already initialized thread - %s",
00736 file);
00737 }
00738 rb_raise(rb_eThreadError, "already initialized thread - %s:%d",
00739 file, NUM2INT(line));
00740 }
00741 return thread_create_core(thread, args, 0);
00742 }
00743
00744 VALUE
00745 rb_thread_create(VALUE (*fn)(ANYARGS), void *arg)
00746 {
00747 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn);
00748 }
00749
00750
00751
00752 #define DELAY_INFTY 1E30
00753
00754 struct join_arg {
00755 rb_thread_t *target, *waiting;
00756 double limit;
00757 int forever;
00758 };
00759
00760 static VALUE
00761 remove_from_join_list(VALUE arg)
00762 {
00763 struct join_arg *p = (struct join_arg *)arg;
00764 rb_thread_t *target_th = p->target, *th = p->waiting;
00765
00766 if (target_th->status != THREAD_KILLED) {
00767 rb_thread_list_t **p = &target_th->join_list;
00768
00769 while (*p) {
00770 if ((*p)->th == th) {
00771 *p = (*p)->next;
00772 break;
00773 }
00774 p = &(*p)->next;
00775 }
00776 }
00777
00778 return Qnil;
00779 }
00780
00781 static VALUE
00782 thread_join_sleep(VALUE arg)
00783 {
00784 struct join_arg *p = (struct join_arg *)arg;
00785 rb_thread_t *target_th = p->target, *th = p->waiting;
00786 double now, limit = p->limit;
00787
00788 while (target_th->status != THREAD_KILLED) {
00789 if (p->forever) {
00790 sleep_forever(th, 1, 0);
00791 }
00792 else {
00793 now = timeofday();
00794 if (now > limit) {
00795 thread_debug("thread_join: timeout (thid: %p)\n",
00796 (void *)target_th->thread_id);
00797 return Qfalse;
00798 }
00799 sleep_wait_for_interrupt(th, limit - now, 0);
00800 }
00801 thread_debug("thread_join: interrupted (thid: %p)\n",
00802 (void *)target_th->thread_id);
00803 }
00804 return Qtrue;
00805 }
00806
00807 static VALUE
00808 thread_join(rb_thread_t *target_th, double delay)
00809 {
00810 rb_thread_t *th = GET_THREAD();
00811 struct join_arg arg;
00812
00813 if (th == target_th) {
00814 rb_raise(rb_eThreadError, "Target thread must not be current thread");
00815 }
00816 if (GET_VM()->main_thread == target_th) {
00817 rb_raise(rb_eThreadError, "Target thread must not be main thread");
00818 }
00819
00820 arg.target = target_th;
00821 arg.waiting = th;
00822 arg.limit = timeofday() + delay;
00823 arg.forever = delay == DELAY_INFTY;
00824
00825 thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id);
00826
00827 if (target_th->status != THREAD_KILLED) {
00828 rb_thread_list_t list;
00829 list.next = target_th->join_list;
00830 list.th = th;
00831 target_th->join_list = &list;
00832 if (!rb_ensure(thread_join_sleep, (VALUE)&arg,
00833 remove_from_join_list, (VALUE)&arg)) {
00834 return Qnil;
00835 }
00836 }
00837
00838 thread_debug("thread_join: success (thid: %p)\n",
00839 (void *)target_th->thread_id);
00840
00841 if (target_th->errinfo != Qnil) {
00842 VALUE err = target_th->errinfo;
00843
00844 if (FIXNUM_P(err)) {
00845
00846 }
00847 else if (RB_TYPE_P(target_th->errinfo, T_NODE)) {
00848 rb_exc_raise(rb_vm_make_jump_tag_but_local_jump(
00849 GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err)));
00850 }
00851 else {
00852
00853 rb_exc_raise(err);
00854 }
00855 }
00856 return target_th->self;
00857 }
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895
00896
00897
00898 static VALUE
00899 thread_join_m(int argc, VALUE *argv, VALUE self)
00900 {
00901 rb_thread_t *target_th;
00902 double delay = DELAY_INFTY;
00903 VALUE limit;
00904
00905 GetThreadPtr(self, target_th);
00906
00907 rb_scan_args(argc, argv, "01", &limit);
00908 if (!NIL_P(limit)) {
00909 delay = rb_num2dbl(limit);
00910 }
00911
00912 return thread_join(target_th, delay);
00913 }
00914
00915
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928
00929 static VALUE
00930 thread_value(VALUE self)
00931 {
00932 rb_thread_t *th;
00933 GetThreadPtr(self, th);
00934 thread_join(th, DELAY_INFTY);
00935 return th->value;
00936 }
00937
00938
00939
00940
00941
00942
00943
00944
00945
00946
00947
00948
00949
00950
00951
00952
00953
00954
00955 #if SIGNEDNESS_OF_TIME_T < 0
00956 # define TIMEVAL_SEC_MAX SIGNED_INTEGER_MAX(TYPEOF_TIMEVAL_TV_SEC)
00957 # define TIMEVAL_SEC_MIN SIGNED_INTEGER_MIN(TYPEOF_TIMEVAL_TV_SEC)
00958 #elif SIGNEDNESS_OF_TIME_T > 0
00959 # define TIMEVAL_SEC_MAX ((TYPEOF_TIMEVAL_TV_SEC)(~(unsigned_time_t)0))
00960 # define TIMEVAL_SEC_MIN ((TYPEOF_TIMEVAL_TV_SEC)0)
00961 #endif
00962
00963 static struct timeval
00964 double2timeval(double d)
00965 {
00966
00967 const double TIMEVAL_SEC_MAX_PLUS_ONE = (2*(double)(TIMEVAL_SEC_MAX/2+1));
00968
00969 struct timeval time;
00970
00971 if (TIMEVAL_SEC_MAX_PLUS_ONE <= d) {
00972 time.tv_sec = TIMEVAL_SEC_MAX;
00973 time.tv_usec = 999999;
00974 }
00975 else if (d <= TIMEVAL_SEC_MIN) {
00976 time.tv_sec = TIMEVAL_SEC_MIN;
00977 time.tv_usec = 0;
00978 }
00979 else {
00980 time.tv_sec = (TYPEOF_TIMEVAL_TV_SEC)d;
00981 time.tv_usec = (int)((d - (time_t)d) * 1e6);
00982 if (time.tv_usec < 0) {
00983 time.tv_usec += (int)1e6;
00984 time.tv_sec -= 1;
00985 }
00986 }
00987 return time;
00988 }
00989
00990 static void
00991 sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check)
00992 {
00993 enum rb_thread_status prev_status = th->status;
00994 enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
00995
00996 th->status = status;
00997 RUBY_VM_CHECK_INTS_BLOCKING(th);
00998 while (th->status == status) {
00999 if (deadlockable) {
01000 th->vm->sleeper++;
01001 rb_check_deadlock(th->vm);
01002 }
01003 native_sleep(th, 0);
01004 if (deadlockable) {
01005 th->vm->sleeper--;
01006 }
01007 RUBY_VM_CHECK_INTS_BLOCKING(th);
01008 if (!spurious_check)
01009 break;
01010 }
01011 th->status = prev_status;
01012 }
01013
01014 static void
01015 getclockofday(struct timeval *tp)
01016 {
01017 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
01018 struct timespec ts;
01019
01020 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
01021 tp->tv_sec = ts.tv_sec;
01022 tp->tv_usec = ts.tv_nsec / 1000;
01023 } else
01024 #endif
01025 {
01026 gettimeofday(tp, NULL);
01027 }
01028 }
01029
01030 static void
01031 sleep_timeval(rb_thread_t *th, struct timeval tv, int spurious_check)
01032 {
01033 struct timeval to, tvn;
01034 enum rb_thread_status prev_status = th->status;
01035
01036 getclockofday(&to);
01037 if (TIMEVAL_SEC_MAX - tv.tv_sec < to.tv_sec)
01038 to.tv_sec = TIMEVAL_SEC_MAX;
01039 else
01040 to.tv_sec += tv.tv_sec;
01041 if ((to.tv_usec += tv.tv_usec) >= 1000000) {
01042 if (to.tv_sec == TIMEVAL_SEC_MAX)
01043 to.tv_usec = 999999;
01044 else {
01045 to.tv_sec++;
01046 to.tv_usec -= 1000000;
01047 }
01048 }
01049
01050 th->status = THREAD_STOPPED;
01051 RUBY_VM_CHECK_INTS_BLOCKING(th);
01052 while (th->status == THREAD_STOPPED) {
01053 native_sleep(th, &tv);
01054 RUBY_VM_CHECK_INTS_BLOCKING(th);
01055 getclockofday(&tvn);
01056 if (to.tv_sec < tvn.tv_sec) break;
01057 if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
01058 thread_debug("sleep_timeval: %"PRI_TIMET_PREFIX"d.%.6ld > %"PRI_TIMET_PREFIX"d.%.6ld\n",
01059 (time_t)to.tv_sec, (long)to.tv_usec,
01060 (time_t)tvn.tv_sec, (long)tvn.tv_usec);
01061 tv.tv_sec = to.tv_sec - tvn.tv_sec;
01062 if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) {
01063 --tv.tv_sec;
01064 tv.tv_usec += 1000000;
01065 }
01066 if (!spurious_check)
01067 break;
01068 }
01069 th->status = prev_status;
01070 }
01071
01072 void
01073 rb_thread_sleep_forever(void)
01074 {
01075 thread_debug("rb_thread_sleep_forever\n");
01076 sleep_forever(GET_THREAD(), 0, 1);
01077 }
01078
01079 void
01080 rb_thread_sleep_deadly(void)
01081 {
01082 thread_debug("rb_thread_sleep_deadly\n");
01083 sleep_forever(GET_THREAD(), 1, 1);
01084 }
01085
01086 static double
01087 timeofday(void)
01088 {
01089 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
01090 struct timespec tp;
01091
01092 if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) {
01093 return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9;
01094 } else
01095 #endif
01096 {
01097 struct timeval tv;
01098 gettimeofday(&tv, NULL);
01099 return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
01100 }
01101 }
01102
01103 static void
01104 sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check)
01105 {
01106 sleep_timeval(th, double2timeval(sleepsec), spurious_check);
01107 }
01108
01109 static void
01110 sleep_for_polling(rb_thread_t *th)
01111 {
01112 struct timeval time;
01113 time.tv_sec = 0;
01114 time.tv_usec = 100 * 1000;
01115 sleep_timeval(th, time, 1);
01116 }
01117
01118 void
01119 rb_thread_wait_for(struct timeval time)
01120 {
01121 rb_thread_t *th = GET_THREAD();
01122 sleep_timeval(th, time, 1);
01123 }
01124
01125 void
01126 rb_thread_polling(void)
01127 {
01128 if (!rb_thread_alone()) {
01129 rb_thread_t *th = GET_THREAD();
01130 RUBY_VM_CHECK_INTS_BLOCKING(th);
01131 sleep_for_polling(th);
01132 }
01133 }
01134
01135
01136
01137
01138
01139
01140
01141
01142 void
01143 rb_thread_check_ints(void)
01144 {
01145 RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD());
01146 }
01147
01148
01149
01150
01151
01152 int
01153 rb_thread_check_trap_pending(void)
01154 {
01155 return rb_signal_buff_size() != 0;
01156 }
01157
01158
01159 int
01160 rb_thread_interrupted(VALUE thval)
01161 {
01162 rb_thread_t *th;
01163 GetThreadPtr(thval, th);
01164 return (int)RUBY_VM_INTERRUPTED(th);
01165 }
01166
01167 void
01168 rb_thread_sleep(int sec)
01169 {
01170 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
01171 }
01172
01173 static void
01174 rb_thread_schedule_limits(unsigned long limits_us)
01175 {
01176 thread_debug("rb_thread_schedule\n");
01177 if (!rb_thread_alone()) {
01178 rb_thread_t *th = GET_THREAD();
01179
01180 if (th->running_time_us >= limits_us) {
01181 thread_debug("rb_thread_schedule/switch start\n");
01182 RB_GC_SAVE_MACHINE_CONTEXT(th);
01183 gvl_yield(th->vm, th);
01184 rb_thread_set_current(th);
01185 thread_debug("rb_thread_schedule/switch done\n");
01186 }
01187 }
01188 }
01189
01190 void
01191 rb_thread_schedule(void)
01192 {
01193 rb_thread_t *cur_th = GET_THREAD();
01194 rb_thread_schedule_limits(0);
01195
01196 if (UNLIKELY(RUBY_VM_INTERRUPTED_ANY(cur_th))) {
01197 rb_threadptr_execute_interrupts(cur_th, 0);
01198 }
01199 }
01200
01201
01202
01203 static inline int
01204 blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
01205 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
01206 {
01207 region->prev_status = th->status;
01208 if (set_unblock_function(th, ubf, arg, ®ion->oldubf, fail_if_interrupted)) {
01209 th->blocking_region_buffer = region;
01210 th->status = THREAD_STOPPED;
01211 thread_debug("enter blocking region (%p)\n", (void *)th);
01212 RB_GC_SAVE_MACHINE_CONTEXT(th);
01213 gvl_release(th->vm);
01214 return TRUE;
01215 }
01216 else {
01217 return FALSE;
01218 }
01219 }
01220
01221 static inline void
01222 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
01223 {
01224 gvl_acquire(th->vm, th);
01225 rb_thread_set_current(th);
01226 thread_debug("leave blocking region (%p)\n", (void *)th);
01227 remove_signal_thread_list(th);
01228 th->blocking_region_buffer = 0;
01229 reset_unblock_function(th, ®ion->oldubf);
01230 if (th->status == THREAD_STOPPED) {
01231 th->status = region->prev_status;
01232 }
01233 }
01234
01235 struct rb_blocking_region_buffer *
01236 rb_thread_blocking_region_begin(void)
01237 {
01238 rb_thread_t *th = GET_THREAD();
01239 struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer);
01240 blocking_region_begin(th, region, ubf_select, th, FALSE);
01241 return region;
01242 }
01243
01244 void
01245 rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
01246 {
01247 int saved_errno = errno;
01248 rb_thread_t *th = ruby_thread_from_native();
01249 blocking_region_end(th, region);
01250 xfree(region);
01251 RUBY_VM_CHECK_INTS_BLOCKING(th);
01252 errno = saved_errno;
01253 }
01254
01255 static void *
01256 call_without_gvl(void *(*func)(void *), void *data1,
01257 rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted)
01258 {
01259 void *val = 0;
01260
01261 rb_thread_t *th = GET_THREAD();
01262 int saved_errno = 0;
01263
01264 th->waiting_fd = -1;
01265 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
01266 ubf = ubf_select;
01267 data2 = th;
01268 }
01269
01270 BLOCKING_REGION({
01271 val = func(data1);
01272 saved_errno = errno;
01273 }, ubf, data2, fail_if_interrupted);
01274
01275 if (!fail_if_interrupted) {
01276 RUBY_VM_CHECK_INTS_BLOCKING(th);
01277 }
01278
01279 errno = saved_errno;
01280
01281 return val;
01282 }
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311
01312
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329
01330
01331
01332
01333
01334
01335
01336
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361
01362
01363
01364
01365
01366
01367
01368
01369 void *
01370 rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
01371 rb_unblock_function_t *ubf, void *data2)
01372 {
01373 return call_without_gvl(func, data1, ubf, data2, TRUE);
01374 }
01375
01376 void *
01377 rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
01378 rb_unblock_function_t *ubf, void *data2)
01379 {
01380 return call_without_gvl(func, data1, ubf, data2, FALSE);
01381 }
01382
01383 VALUE
01384 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
01385 {
01386 VALUE val = Qundef;
01387 rb_thread_t *th = GET_THREAD();
01388 int saved_errno = 0;
01389 int state;
01390
01391 th->waiting_fd = fd;
01392
01393 TH_PUSH_TAG(th);
01394 if ((state = EXEC_TAG()) == 0) {
01395 BLOCKING_REGION({
01396 val = func(data1);
01397 saved_errno = errno;
01398 }, ubf_select, th, FALSE);
01399 }
01400 TH_POP_TAG();
01401
01402
01403 th->waiting_fd = -1;
01404
01405 if (state) {
01406 JUMP_TAG(state);
01407 }
01408
01409 RUBY_VM_CHECK_INTS_BLOCKING(th);
01410
01411 errno = saved_errno;
01412
01413 return val;
01414 }
01415
01416 VALUE
01417 rb_thread_blocking_region(
01418 rb_blocking_function_t *func, void *data1,
01419 rb_unblock_function_t *ubf, void *data2)
01420 {
01421 void *(*f)(void*) = (void *(*)(void*))func;
01422 return (VALUE)rb_thread_call_without_gvl(f, data1, ubf, data2);
01423 }
01424
01425
01426
01427
01428
01429
01430
01431
01432
01433
01434
01435
01436
01437
01438
01439
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453 void *
01454 rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
01455 {
01456 rb_thread_t *th = ruby_thread_from_native();
01457 struct rb_blocking_region_buffer *brb;
01458 struct rb_unblock_callback prev_unblock;
01459 void *r;
01460
01461 if (th == 0) {
01462
01463
01464
01465
01466
01467 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
01468 exit(EXIT_FAILURE);
01469 }
01470
01471 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer;
01472 prev_unblock = th->unblock;
01473
01474 if (brb == 0) {
01475 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
01476 }
01477
01478 blocking_region_end(th, brb);
01479
01480 r = (*func)(data1);
01481
01482 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
01483 return r;
01484 }
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495 int
01496 ruby_thread_has_gvl_p(void)
01497 {
01498 rb_thread_t *th = ruby_thread_from_native();
01499
01500 if (th && th->blocking_region_buffer == 0) {
01501 return 1;
01502 }
01503 else {
01504 return 0;
01505 }
01506 }
01507
01508
01509
01510
01511
01512
01513
01514
01515
01516 static VALUE
01517 thread_s_pass(VALUE klass)
01518 {
01519 rb_thread_schedule();
01520 return Qnil;
01521 }
01522
01523
01524
01525
01526
01527
01528
01529
01530
01531
01532
01533
01534
01535
01536
01537
01538
01539
01540
01541 void
01542 rb_threadptr_pending_interrupt_clear(rb_thread_t *th)
01543 {
01544 rb_ary_clear(th->pending_interrupt_queue);
01545 }
01546
01547 void
01548 rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v)
01549 {
01550 rb_ary_push(th->pending_interrupt_queue, v);
01551 th->pending_interrupt_queue_checked = 0;
01552 }
01553
01554 enum handle_interrupt_timing {
01555 INTERRUPT_NONE,
01556 INTERRUPT_IMMEDIATE,
01557 INTERRUPT_ON_BLOCKING,
01558 INTERRUPT_NEVER
01559 };
01560
01561 static enum handle_interrupt_timing
01562 rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
01563 {
01564 VALUE mask;
01565 long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack);
01566 const VALUE *mask_stack = RARRAY_CONST_PTR(th->pending_interrupt_mask_stack);
01567 VALUE ancestors = rb_mod_ancestors(err);
01568 long ancestors_len = RARRAY_LEN(ancestors);
01569 const VALUE *ancestors_ptr = RARRAY_CONST_PTR(ancestors);
01570 int i, j;
01571
01572 for (i=0; i<mask_stack_len; i++) {
01573 mask = mask_stack[mask_stack_len-(i+1)];
01574
01575 for (j=0; j<ancestors_len; j++) {
01576 VALUE klass = ancestors_ptr[j];
01577 VALUE sym;
01578
01579
01580 if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
01581 if (sym == sym_immediate) {
01582 return INTERRUPT_IMMEDIATE;
01583 }
01584 else if (sym == sym_on_blocking) {
01585 return INTERRUPT_ON_BLOCKING;
01586 }
01587 else if (sym == sym_never) {
01588 return INTERRUPT_NEVER;
01589 }
01590 else {
01591 rb_raise(rb_eThreadError, "unknown mask signature");
01592 }
01593 }
01594 }
01595
01596 }
01597 return INTERRUPT_NONE;
01598 }
01599
01600 static int
01601 rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th)
01602 {
01603 return RARRAY_LEN(th->pending_interrupt_queue) == 0;
01604 }
01605
01606 static int
01607 rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err)
01608 {
01609 int i;
01610 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
01611 VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i);
01612 if (rb_class_inherited_p(e, err)) {
01613 return TRUE;
01614 }
01615 }
01616 return FALSE;
01617 }
01618
01619 static VALUE
01620 rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing)
01621 {
01622 #if 1
01623 int i;
01624
01625 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
01626 VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i);
01627
01628 enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
01629
01630 switch (mask_timing) {
01631 case INTERRUPT_ON_BLOCKING:
01632 if (timing != INTERRUPT_ON_BLOCKING) {
01633 break;
01634 }
01635
01636 case INTERRUPT_NONE:
01637 case INTERRUPT_IMMEDIATE:
01638 rb_ary_delete_at(th->pending_interrupt_queue, i);
01639 return err;
01640 case INTERRUPT_NEVER:
01641 break;
01642 }
01643 }
01644
01645 th->pending_interrupt_queue_checked = 1;
01646 return Qundef;
01647 #else
01648 VALUE err = rb_ary_shift(th->pending_interrupt_queue);
01649 if (rb_threadptr_pending_interrupt_empty_p(th)) {
01650 th->pending_interrupt_queue_checked = 1;
01651 }
01652 return err;
01653 #endif
01654 }
01655
01656 int
01657 rb_threadptr_pending_interrupt_active_p(rb_thread_t *th)
01658 {
01659
01660
01661
01662
01663
01664 if (th->pending_interrupt_queue_checked) {
01665 return 0;
01666 }
01667
01668 if (rb_threadptr_pending_interrupt_empty_p(th)) {
01669 return 0;
01670 }
01671
01672 return 1;
01673 }
01674
01675 static int
01676 handle_interrupt_arg_check_i(VALUE key, VALUE val)
01677 {
01678 if (val != sym_immediate && val != sym_on_blocking && val != sym_never) {
01679 rb_raise(rb_eArgError, "unknown mask signature");
01680 }
01681
01682 return ST_CONTINUE;
01683 }
01684
01685
01686
01687
01688
01689
01690
01691
01692
01693
01694
01695
01696
01697
01698
01699
01700
01701
01702
01703
01704
01705
01706
01707
01708
01709
01710
01711
01712
01713
01714
01715
01716
01717
01718
01719
01720
01721
01722
01723
01724
01725
01726
01727
01728
01729
01730
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741
01742
01743
01744
01745
01746
01747
01748
01749
01750
01751
01752
01753
01754
01755
01756
01757
01758
01759
01760
01761
01762
01763
01764
01765
01766
01767
01768
01769
01770
01771
01772
01773
01774
01775
01776
01777
01778
01779
01780
01781
01782
01783
01784
01785
01786
01787
01788
01789
01790
01791 static VALUE
01792 rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
01793 {
01794 VALUE mask;
01795 rb_thread_t *th = GET_THREAD();
01796 VALUE r = Qnil;
01797 int state;
01798
01799 if (!rb_block_given_p()) {
01800 rb_raise(rb_eArgError, "block is needed.");
01801 }
01802
01803 mask = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash");
01804 rb_hash_foreach(mask, handle_interrupt_arg_check_i, 0);
01805 rb_ary_push(th->pending_interrupt_mask_stack, mask);
01806 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
01807 th->pending_interrupt_queue_checked = 0;
01808 RUBY_VM_SET_INTERRUPT(th);
01809 }
01810
01811 TH_PUSH_TAG(th);
01812 if ((state = EXEC_TAG()) == 0) {
01813 r = rb_yield(Qnil);
01814 }
01815 TH_POP_TAG();
01816
01817 rb_ary_pop(th->pending_interrupt_mask_stack);
01818 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
01819 th->pending_interrupt_queue_checked = 0;
01820 RUBY_VM_SET_INTERRUPT(th);
01821 }
01822
01823 RUBY_VM_CHECK_INTS(th);
01824
01825 if (state) {
01826 JUMP_TAG(state);
01827 }
01828
01829 return r;
01830 }
01831
01832
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842 static VALUE
01843 rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
01844 {
01845 rb_thread_t *target_th;
01846
01847 GetThreadPtr(target_thread, target_th);
01848
01849 if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
01850 return Qfalse;
01851 }
01852 else {
01853 if (argc == 1) {
01854 VALUE err;
01855 rb_scan_args(argc, argv, "01", &err);
01856 if (!rb_obj_is_kind_of(err, rb_cModule)) {
01857 rb_raise(rb_eTypeError, "class or module required for rescue clause");
01858 }
01859 if (rb_threadptr_pending_interrupt_include_p(target_th, err)) {
01860 return Qtrue;
01861 }
01862 else {
01863 return Qfalse;
01864 }
01865 }
01866 return Qtrue;
01867 }
01868 }
01869
01870
01871
01872
01873
01874
01875
01876
01877
01878
01879
01880
01881
01882
01883
01884
01885
01886
01887
01888
01889
01890
01891
01892
01893
01894
01895
01896
01897
01898
01899
01900
01901
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912
01913
01914
01915
01916
01917
01918
01919
01920
01921
01922
01923
01924
01925
01926
01927 static VALUE
01928 rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
01929 {
01930 return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
01931 }
01932
01933 static void
01934 rb_threadptr_to_kill(rb_thread_t *th)
01935 {
01936 rb_threadptr_pending_interrupt_clear(th);
01937 th->status = THREAD_RUNNABLE;
01938 th->to_kill = 1;
01939 th->errinfo = INT2FIX(TAG_FATAL);
01940 TH_JUMP_TAG(th, TAG_FATAL);
01941 }
01942
01943 static inline rb_atomic_t
01944 threadptr_get_interrupts(rb_thread_t *th)
01945 {
01946 rb_atomic_t interrupt;
01947 rb_atomic_t old;
01948
01949 do {
01950 interrupt = th->interrupt_flag;
01951 old = ATOMIC_CAS(th->interrupt_flag, interrupt, interrupt & th->interrupt_mask);
01952 } while (old != interrupt);
01953 return interrupt & (rb_atomic_t)~th->interrupt_mask;
01954 }
01955
01956 void
01957 rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
01958 {
01959 rb_atomic_t interrupt;
01960 int postponed_job_interrupt = 0;
01961
01962 if (th->raised_flag) return;
01963
01964 while ((interrupt = threadptr_get_interrupts(th)) != 0) {
01965 int sig;
01966 int timer_interrupt;
01967 int pending_interrupt;
01968 int trap_interrupt;
01969
01970 timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
01971 pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
01972 postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
01973 trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
01974
01975 if (postponed_job_interrupt) {
01976 rb_postponed_job_flush(th->vm);
01977 }
01978
01979
01980 if (trap_interrupt && (th == th->vm->main_thread)) {
01981 enum rb_thread_status prev_status = th->status;
01982 th->status = THREAD_RUNNABLE;
01983 while ((sig = rb_get_next_signal()) != 0) {
01984 rb_signal_exec(th, sig);
01985 }
01986 th->status = prev_status;
01987 }
01988
01989
01990 if (pending_interrupt && rb_threadptr_pending_interrupt_active_p(th)) {
01991 VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
01992 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
01993
01994 if (err == Qundef) {
01995
01996 }
01997 else if (err == eKillSignal ||
01998 err == eTerminateSignal ||
01999 err == INT2FIX(TAG_FATAL) ) {
02000 rb_threadptr_to_kill(th);
02001 }
02002 else {
02003
02004 if (th->status == THREAD_STOPPED ||
02005 th->status == THREAD_STOPPED_FOREVER)
02006 th->status = THREAD_RUNNABLE;
02007 rb_exc_raise(err);
02008 }
02009 }
02010
02011 if (timer_interrupt) {
02012 unsigned long limits_us = TIME_QUANTUM_USEC;
02013
02014 if (th->priority > 0)
02015 limits_us <<= th->priority;
02016 else
02017 limits_us >>= -th->priority;
02018
02019 if (th->status == THREAD_RUNNABLE)
02020 th->running_time_us += TIME_QUANTUM_USEC;
02021
02022 EXEC_EVENT_HOOK(th, RUBY_INTERNAL_EVENT_SWITCH, th->cfp->self, 0, 0, Qundef);
02023
02024 rb_thread_schedule_limits(limits_us);
02025 }
02026 }
02027 }
02028
02029 void
02030 rb_thread_execute_interrupts(VALUE thval)
02031 {
02032 rb_thread_t *th;
02033 GetThreadPtr(thval, th);
02034 rb_threadptr_execute_interrupts(th, 1);
02035 }
02036
02037 static void
02038 rb_threadptr_ready(rb_thread_t *th)
02039 {
02040 rb_threadptr_interrupt(th);
02041 }
02042
02043 static VALUE
02044 rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
02045 {
02046 VALUE exc;
02047
02048 if (rb_threadptr_dead(th)) {
02049 return Qnil;
02050 }
02051
02052 if (argc == 0) {
02053 exc = rb_exc_new(rb_eRuntimeError, 0, 0);
02054 }
02055 else {
02056 exc = rb_make_exception(argc, argv);
02057 }
02058 rb_threadptr_pending_interrupt_enque(th, exc);
02059 rb_threadptr_interrupt(th);
02060 return Qnil;
02061 }
02062
02063 void
02064 rb_threadptr_signal_raise(rb_thread_t *th, int sig)
02065 {
02066 VALUE argv[2];
02067
02068 argv[0] = rb_eSignal;
02069 argv[1] = INT2FIX(sig);
02070 rb_threadptr_raise(th->vm->main_thread, 2, argv);
02071 }
02072
02073 void
02074 rb_threadptr_signal_exit(rb_thread_t *th)
02075 {
02076 VALUE argv[2];
02077
02078 argv[0] = rb_eSystemExit;
02079 argv[1] = rb_str_new2("exit");
02080 rb_threadptr_raise(th->vm->main_thread, 2, argv);
02081 }
02082
02083 #if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK)
02084 #define USE_SIGALTSTACK
02085 #endif
02086
02087 void
02088 ruby_thread_stack_overflow(rb_thread_t *th)
02089 {
02090 th->raised_flag = 0;
02091 #ifdef USE_SIGALTSTACK
02092 rb_exc_raise(sysstack_error);
02093 #else
02094 th->errinfo = sysstack_error;
02095 TH_JUMP_TAG(th, TAG_RAISE);
02096 #endif
02097 }
02098
02099 int
02100 rb_threadptr_set_raised(rb_thread_t *th)
02101 {
02102 if (th->raised_flag & RAISED_EXCEPTION) {
02103 return 1;
02104 }
02105 th->raised_flag |= RAISED_EXCEPTION;
02106 return 0;
02107 }
02108
02109 int
02110 rb_threadptr_reset_raised(rb_thread_t *th)
02111 {
02112 if (!(th->raised_flag & RAISED_EXCEPTION)) {
02113 return 0;
02114 }
02115 th->raised_flag &= ~RAISED_EXCEPTION;
02116 return 1;
02117 }
02118
02119 static int
02120 thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
02121 {
02122 int fd = (int)data;
02123 rb_thread_t *th;
02124 GetThreadPtr((VALUE)key, th);
02125
02126 if (th->waiting_fd == fd) {
02127 VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
02128 rb_threadptr_pending_interrupt_enque(th, err);
02129 rb_threadptr_interrupt(th);
02130 }
02131 return ST_CONTINUE;
02132 }
02133
02134 void
02135 rb_thread_fd_close(int fd)
02136 {
02137 st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd);
02138 }
02139
02140
02141
02142
02143
02144
02145
02146
02147
02148
02149
02150
02151
02152
02153
02154
02155
02156
02157
02158
02159
02160
02161 static VALUE
02162 thread_raise_m(int argc, VALUE *argv, VALUE self)
02163 {
02164 rb_thread_t *target_th;
02165 rb_thread_t *th = GET_THREAD();
02166 GetThreadPtr(self, target_th);
02167 rb_threadptr_raise(target_th, argc, argv);
02168
02169
02170 if (th == target_th) {
02171 RUBY_VM_CHECK_INTS(th);
02172 }
02173 return Qnil;
02174 }
02175
02176
02177
02178
02179
02180
02181
02182
02183
02184
02185
02186
02187
02188
02189
02190 VALUE
02191 rb_thread_kill(VALUE thread)
02192 {
02193 rb_thread_t *th;
02194
02195 GetThreadPtr(thread, th);
02196
02197 if (th->to_kill || th->status == THREAD_KILLED) {
02198 return thread;
02199 }
02200 if (th == th->vm->main_thread) {
02201 rb_exit(EXIT_SUCCESS);
02202 }
02203
02204 thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
02205
02206 if (th == GET_THREAD()) {
02207
02208 rb_threadptr_to_kill(th);
02209 }
02210 else {
02211 rb_threadptr_pending_interrupt_enque(th, eKillSignal);
02212 rb_threadptr_interrupt(th);
02213 }
02214 return thread;
02215 }
02216
02217
02218
02219
02220
02221
02222
02223
02224
02225
02226
02227
02228
02229
02230
02231
02232 static VALUE
02233 rb_thread_s_kill(VALUE obj, VALUE th)
02234 {
02235 return rb_thread_kill(th);
02236 }
02237
02238
02239
02240
02241
02242
02243
02244
02245
02246
02247
02248
02249
02250
02251 static VALUE
02252 rb_thread_exit(void)
02253 {
02254 rb_thread_t *th = GET_THREAD();
02255 return rb_thread_kill(th->self);
02256 }
02257
02258
02259
02260
02261
02262
02263
02264
02265
02266
02267
02268
02269
02270
02271
02272
02273
02274
02275 VALUE
02276 rb_thread_wakeup(VALUE thread)
02277 {
02278 if (!RTEST(rb_thread_wakeup_alive(thread))) {
02279 rb_raise(rb_eThreadError, "killed thread");
02280 }
02281 return thread;
02282 }
02283
02284 VALUE
02285 rb_thread_wakeup_alive(VALUE thread)
02286 {
02287 rb_thread_t *th;
02288 GetThreadPtr(thread, th);
02289
02290 if (th->status == THREAD_KILLED) {
02291 return Qnil;
02292 }
02293 rb_threadptr_ready(th);
02294 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
02295 th->status = THREAD_RUNNABLE;
02296 return thread;
02297 }
02298
02299
02300
02301
02302
02303
02304
02305
02306
02307
02308
02309
02310
02311
02312
02313
02314
02315
02316
02317
02318
02319
02320
02321 VALUE
02322 rb_thread_run(VALUE thread)
02323 {
02324 rb_thread_wakeup(thread);
02325 rb_thread_schedule();
02326 return thread;
02327 }
02328
02329
02330
02331
02332
02333
02334
02335
02336
02337
02338
02339
02340
02341
02342
02343
02344
02345 VALUE
02346 rb_thread_stop(void)
02347 {
02348 if (rb_thread_alone()) {
02349 rb_raise(rb_eThreadError,
02350 "stopping only thread\n\tnote: use sleep to stop forever");
02351 }
02352 rb_thread_sleep_deadly();
02353 return Qnil;
02354 }
02355
02356 static int
02357 thread_list_i(st_data_t key, st_data_t val, void *data)
02358 {
02359 VALUE ary = (VALUE)data;
02360 rb_thread_t *th;
02361 GetThreadPtr((VALUE)key, th);
02362
02363 switch (th->status) {
02364 case THREAD_RUNNABLE:
02365 case THREAD_STOPPED:
02366 case THREAD_STOPPED_FOREVER:
02367 rb_ary_push(ary, th->self);
02368 default:
02369 break;
02370 }
02371 return ST_CONTINUE;
02372 }
02373
02374
02375
02376
02377
02378
02379
02380
02381
02382
02383
02384
02385
02386
02387
02388
02389
02390
02391
02392
02393
02394
02395
02396 VALUE
02397 rb_thread_list(void)
02398 {
02399 VALUE ary = rb_ary_new();
02400 st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary);
02401 return ary;
02402 }
02403
02404 VALUE
02405 rb_thread_current(void)
02406 {
02407 return GET_THREAD()->self;
02408 }
02409
02410
02411
02412
02413
02414
02415
02416
02417
02418
02419 static VALUE
02420 thread_s_current(VALUE klass)
02421 {
02422 return rb_thread_current();
02423 }
02424
02425 VALUE
02426 rb_thread_main(void)
02427 {
02428 return GET_THREAD()->vm->main_thread->self;
02429 }
02430
02431
02432
02433
02434
02435
02436
02437
02438 static VALUE
02439 rb_thread_s_main(VALUE klass)
02440 {
02441 return rb_thread_main();
02442 }
02443
02444
02445
02446
02447
02448
02449
02450
02451
02452
02453
02454
02455
02456
02457
02458
02459
02460
02461
02462
02463
02464
02465 static VALUE
02466 rb_thread_s_abort_exc(void)
02467 {
02468 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse;
02469 }
02470
02471
02472
02473
02474
02475
02476
02477
02478
02479
02480
02481
02482
02483
02484
02485
02486
02487
02488
02489
02490
02491
02492
02493
02494
02495
02496
02497
02498
02499
02500
02501 static VALUE
02502 rb_thread_s_abort_exc_set(VALUE self, VALUE val)
02503 {
02504 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val);
02505 return val;
02506 }
02507
02508
02509
02510
02511
02512
02513
02514
02515
02516
02517
02518
02519
02520
02521
02522
02523
02524 static VALUE
02525 rb_thread_abort_exc(VALUE thread)
02526 {
02527 rb_thread_t *th;
02528 GetThreadPtr(thread, th);
02529 return th->abort_on_exception ? Qtrue : Qfalse;
02530 }
02531
02532
02533
02534
02535
02536
02537
02538
02539
02540
02541
02542
02543
02544
02545
02546
02547
02548 static VALUE
02549 rb_thread_abort_exc_set(VALUE thread, VALUE val)
02550 {
02551 rb_thread_t *th;
02552
02553 GetThreadPtr(thread, th);
02554 th->abort_on_exception = RTEST(val);
02555 return val;
02556 }
02557
02558
02559
02560
02561
02562
02563
02564
02565
02566
02567
02568
02569 VALUE
02570 rb_thread_group(VALUE thread)
02571 {
02572 rb_thread_t *th;
02573 VALUE group;
02574 GetThreadPtr(thread, th);
02575 group = th->thgroup;
02576
02577 if (!group) {
02578 group = Qnil;
02579 }
02580 return group;
02581 }
02582
02583 static const char *
02584 thread_status_name(rb_thread_t *th)
02585 {
02586 switch (th->status) {
02587 case THREAD_RUNNABLE:
02588 if (th->to_kill)
02589 return "aborting";
02590 else
02591 return "run";
02592 case THREAD_STOPPED:
02593 case THREAD_STOPPED_FOREVER:
02594 return "sleep";
02595 case THREAD_KILLED:
02596 return "dead";
02597 default:
02598 return "unknown";
02599 }
02600 }
02601
02602 static int
02603 rb_threadptr_dead(rb_thread_t *th)
02604 {
02605 return th->status == THREAD_KILLED;
02606 }
02607
02608
02609
02610
02611
02612
02613
02614
02615
02616
02617
02618
02619
02620
02621
02622
02623
02624
02625
02626
02627
02628
02629
02630
02631
02632
02633
02634
02635
02636
02637
02638
02639
02640 static VALUE
02641 rb_thread_status(VALUE thread)
02642 {
02643 rb_thread_t *th;
02644 GetThreadPtr(thread, th);
02645
02646 if (rb_threadptr_dead(th)) {
02647 if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo)
02648 ) {
02649 return Qnil;
02650 }
02651 return Qfalse;
02652 }
02653 return rb_str_new2(thread_status_name(th));
02654 }
02655
02656
02657
02658
02659
02660
02661
02662
02663
02664
02665
02666
02667
02668
02669
02670
02671 static VALUE
02672 rb_thread_alive_p(VALUE thread)
02673 {
02674 rb_thread_t *th;
02675 GetThreadPtr(thread, th);
02676
02677 if (rb_threadptr_dead(th))
02678 return Qfalse;
02679 return Qtrue;
02680 }
02681
02682
02683
02684
02685
02686
02687
02688
02689
02690
02691
02692
02693
02694
02695
02696 static VALUE
02697 rb_thread_stop_p(VALUE thread)
02698 {
02699 rb_thread_t *th;
02700 GetThreadPtr(thread, th);
02701
02702 if (rb_threadptr_dead(th))
02703 return Qtrue;
02704 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
02705 return Qtrue;
02706 return Qfalse;
02707 }
02708
02709
02710
02711
02712
02713
02714
02715
02716
02717
02718
02719
02720
02721 static VALUE
02722 rb_thread_safe_level(VALUE thread)
02723 {
02724 rb_thread_t *th;
02725 GetThreadPtr(thread, th);
02726
02727 return INT2NUM(th->safe_level);
02728 }
02729
02730
02731
02732
02733
02734
02735
02736
02737 static VALUE
02738 rb_thread_inspect(VALUE thread)
02739 {
02740 const char *cname = rb_obj_classname(thread);
02741 rb_thread_t *th;
02742 const char *status;
02743 VALUE str;
02744
02745 GetThreadPtr(thread, th);
02746 status = thread_status_name(th);
02747 str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status);
02748 OBJ_INFECT(str, thread);
02749
02750 return str;
02751 }
02752
02753 static VALUE
02754 threadptr_local_aref(rb_thread_t *th, ID id)
02755 {
02756 st_data_t val;
02757
02758 if (th->local_storage && st_lookup(th->local_storage, id, &val)) {
02759 return (VALUE)val;
02760 }
02761 return Qnil;
02762 }
02763
02764 VALUE
02765 rb_thread_local_aref(VALUE thread, ID id)
02766 {
02767 rb_thread_t *th;
02768 GetThreadPtr(thread, th);
02769 return threadptr_local_aref(th, id);
02770 }
02771
02772
02773
02774
02775
02776
02777
02778
02779
02780
02781
02782
02783
02784
02785
02786
02787
02788
02789
02790
02791
02792
02793
02794
02795
02796
02797
02798
02799
02800
02801
02802
02803
02804
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815
02816
02817
02818
02819
02820
02821
02822
02823
02824
02825
02826
02827
02828
02829
02830
02831
02832 static VALUE
02833 rb_thread_aref(VALUE thread, VALUE key)
02834 {
02835 ID id = rb_check_id(&key);
02836 if (!id) return Qnil;
02837 return rb_thread_local_aref(thread, id);
02838 }
02839
02840 static VALUE
02841 threadptr_local_aset(rb_thread_t *th, ID id, VALUE val)
02842 {
02843 if (NIL_P(val)) {
02844 if (!th->local_storage) return Qnil;
02845 st_delete_wrap(th->local_storage, id);
02846 return Qnil;
02847 }
02848 else {
02849 if (!th->local_storage) {
02850 th->local_storage = st_init_numtable();
02851 }
02852 st_insert(th->local_storage, id, val);
02853 return val;
02854 }
02855 }
02856
02857 VALUE
02858 rb_thread_local_aset(VALUE thread, ID id, VALUE val)
02859 {
02860 rb_thread_t *th;
02861 GetThreadPtr(thread, th);
02862
02863 if (OBJ_FROZEN(thread)) {
02864 rb_error_frozen("thread locals");
02865 }
02866
02867 return threadptr_local_aset(th, id, val);
02868 }
02869
02870
02871
02872
02873
02874
02875
02876
02877
02878
02879
02880
02881
02882
02883 static VALUE
02884 rb_thread_aset(VALUE self, VALUE id, VALUE val)
02885 {
02886 return rb_thread_local_aset(self, rb_to_id(id), val);
02887 }
02888
02889
02890
02891
02892
02893
02894
02895
02896
02897
02898
02899
02900
02901
02902
02903
02904
02905
02906
02907
02908
02909
02910
02911
02912
02913
02914
02915
02916
02917 static VALUE
02918 rb_thread_variable_get(VALUE thread, VALUE key)
02919 {
02920 VALUE locals;
02921 ID id = rb_check_id(&key);
02922
02923 if (!id) return Qnil;
02924 locals = rb_ivar_get(thread, id_locals);
02925 return rb_hash_aref(locals, ID2SYM(id));
02926 }
02927
02928
02929
02930
02931
02932
02933
02934
02935
02936
02937 static VALUE
02938 rb_thread_variable_set(VALUE thread, VALUE id, VALUE val)
02939 {
02940 VALUE locals;
02941
02942 if (OBJ_FROZEN(thread)) {
02943 rb_error_frozen("thread locals");
02944 }
02945
02946 locals = rb_ivar_get(thread, id_locals);
02947 return rb_hash_aset(locals, ID2SYM(rb_to_id(id)), val);
02948 }
02949
02950
02951
02952
02953
02954
02955
02956
02957
02958
02959
02960
02961
02962
02963 static VALUE
02964 rb_thread_key_p(VALUE self, VALUE key)
02965 {
02966 rb_thread_t *th;
02967 ID id = rb_check_id(&key);
02968
02969 GetThreadPtr(self, th);
02970
02971 if (!id || !th->local_storage) {
02972 return Qfalse;
02973 }
02974 if (st_lookup(th->local_storage, id, 0)) {
02975 return Qtrue;
02976 }
02977 return Qfalse;
02978 }
02979
02980 static int
02981 thread_keys_i(ID key, VALUE value, VALUE ary)
02982 {
02983 rb_ary_push(ary, ID2SYM(key));
02984 return ST_CONTINUE;
02985 }
02986
02987 static int
02988 vm_living_thread_num(rb_vm_t *vm)
02989 {
02990 return (int)vm->living_threads->num_entries;
02991 }
02992
02993 int
02994 rb_thread_alone(void)
02995 {
02996 int num = 1;
02997 if (GET_THREAD()->vm->living_threads) {
02998 num = vm_living_thread_num(GET_THREAD()->vm);
02999 thread_debug("rb_thread_alone: %d\n", num);
03000 }
03001 return num == 1;
03002 }
03003
03004
03005
03006
03007
03008
03009
03010
03011
03012
03013
03014
03015
03016
03017
03018 static VALUE
03019 rb_thread_keys(VALUE self)
03020 {
03021 rb_thread_t *th;
03022 VALUE ary = rb_ary_new();
03023 GetThreadPtr(self, th);
03024
03025 if (th->local_storage) {
03026 st_foreach(th->local_storage, thread_keys_i, ary);
03027 }
03028 return ary;
03029 }
03030
03031 static int
03032 keys_i(VALUE key, VALUE value, VALUE ary)
03033 {
03034 rb_ary_push(ary, key);
03035 return ST_CONTINUE;
03036 }
03037
03038
03039
03040
03041
03042
03043
03044
03045
03046
03047
03048
03049
03050
03051
03052
03053
03054
03055 static VALUE
03056 rb_thread_variables(VALUE thread)
03057 {
03058 VALUE locals;
03059 VALUE ary;
03060
03061 locals = rb_ivar_get(thread, id_locals);
03062 ary = rb_ary_new();
03063 rb_hash_foreach(locals, keys_i, ary);
03064
03065 return ary;
03066 }
03067
03068
03069
03070
03071
03072
03073
03074
03075
03076
03077
03078
03079
03080
03081
03082
03083
03084 static VALUE
03085 rb_thread_variable_p(VALUE thread, VALUE key)
03086 {
03087 VALUE locals;
03088 ID id = rb_check_id(&key);
03089
03090 if (!id) return Qfalse;
03091
03092 locals = rb_ivar_get(thread, id_locals);
03093
03094 if (!RHASH(locals)->ntbl)
03095 return Qfalse;
03096
03097 if (st_lookup(RHASH(locals)->ntbl, ID2SYM(id), 0)) {
03098 return Qtrue;
03099 }
03100
03101 return Qfalse;
03102 }
03103
03104
03105
03106
03107
03108
03109
03110
03111
03112
03113
03114
03115
03116
03117
03118
03119 static VALUE
03120 rb_thread_priority(VALUE thread)
03121 {
03122 rb_thread_t *th;
03123 GetThreadPtr(thread, th);
03124 return INT2NUM(th->priority);
03125 }
03126
03127
03128
03129
03130
03131
03132
03133
03134
03135
03136
03137
03138
03139
03140
03141
03142
03143
03144
03145
03146
03147
03148
03149
03150
03151
03152
03153
03154 static VALUE
03155 rb_thread_priority_set(VALUE thread, VALUE prio)
03156 {
03157 rb_thread_t *th;
03158 int priority;
03159 GetThreadPtr(thread, th);
03160
03161
03162 #if USE_NATIVE_THREAD_PRIORITY
03163 th->priority = NUM2INT(prio);
03164 native_thread_apply_priority(th);
03165 #else
03166 priority = NUM2INT(prio);
03167 if (priority > RUBY_THREAD_PRIORITY_MAX) {
03168 priority = RUBY_THREAD_PRIORITY_MAX;
03169 }
03170 else if (priority < RUBY_THREAD_PRIORITY_MIN) {
03171 priority = RUBY_THREAD_PRIORITY_MIN;
03172 }
03173 th->priority = priority;
03174 #endif
03175 return INT2NUM(th->priority);
03176 }
03177
03178
03179
03180 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT)
03181
03182
03183
03184
03185
03186
03187
03188
03189
03190
03191
03192
03193
03194
03195
03196
03197
03198
03199
03200
03201
03202
03203
03204
03205
03206
03207
03208
03209
03210
03211
03212
03213 void
03214 rb_fd_init(rb_fdset_t *fds)
03215 {
03216 fds->maxfd = 0;
03217 fds->fdset = ALLOC(fd_set);
03218 FD_ZERO(fds->fdset);
03219 }
03220
03221 void
03222 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
03223 {
03224 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
03225
03226 if (size < sizeof(fd_set))
03227 size = sizeof(fd_set);
03228 dst->maxfd = src->maxfd;
03229 dst->fdset = xmalloc(size);
03230 memcpy(dst->fdset, src->fdset, size);
03231 }
03232
03233 void
03234 rb_fd_term(rb_fdset_t *fds)
03235 {
03236 if (fds->fdset) xfree(fds->fdset);
03237 fds->maxfd = 0;
03238 fds->fdset = 0;
03239 }
03240
03241 void
03242 rb_fd_zero(rb_fdset_t *fds)
03243 {
03244 if (fds->fdset)
03245 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
03246 }
03247
03248 static void
03249 rb_fd_resize(int n, rb_fdset_t *fds)
03250 {
03251 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask);
03252 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask);
03253
03254 if (m < sizeof(fd_set)) m = sizeof(fd_set);
03255 if (o < sizeof(fd_set)) o = sizeof(fd_set);
03256
03257 if (m > o) {
03258 fds->fdset = xrealloc(fds->fdset, m);
03259 memset((char *)fds->fdset + o, 0, m - o);
03260 }
03261 if (n >= fds->maxfd) fds->maxfd = n + 1;
03262 }
03263
03264 void
03265 rb_fd_set(int n, rb_fdset_t *fds)
03266 {
03267 rb_fd_resize(n, fds);
03268 FD_SET(n, fds->fdset);
03269 }
03270
03271 void
03272 rb_fd_clr(int n, rb_fdset_t *fds)
03273 {
03274 if (n >= fds->maxfd) return;
03275 FD_CLR(n, fds->fdset);
03276 }
03277
03278 int
03279 rb_fd_isset(int n, const rb_fdset_t *fds)
03280 {
03281 if (n >= fds->maxfd) return 0;
03282 return FD_ISSET(n, fds->fdset) != 0;
03283 }
03284
03285 void
03286 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max)
03287 {
03288 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask);
03289
03290 if (size < sizeof(fd_set)) size = sizeof(fd_set);
03291 dst->maxfd = max;
03292 dst->fdset = xrealloc(dst->fdset, size);
03293 memcpy(dst->fdset, src, size);
03294 }
03295
03296 static void
03297 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src)
03298 {
03299 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
03300
03301 if (size > sizeof(fd_set)) {
03302 rb_raise(rb_eArgError, "too large fdsets");
03303 }
03304 memcpy(dst, rb_fd_ptr(src), sizeof(fd_set));
03305 }
03306
03307 void
03308 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
03309 {
03310 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
03311
03312 if (size < sizeof(fd_set))
03313 size = sizeof(fd_set);
03314 dst->maxfd = src->maxfd;
03315 dst->fdset = xrealloc(dst->fdset, size);
03316 memcpy(dst->fdset, src->fdset, size);
03317 }
03318
03319 #ifdef __native_client__
03320 int select(int nfds, fd_set *readfds, fd_set *writefds,
03321 fd_set *exceptfds, struct timeval *timeout);
03322 #endif
03323
03324 int
03325 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
03326 {
03327 fd_set *r = NULL, *w = NULL, *e = NULL;
03328 if (readfds) {
03329 rb_fd_resize(n - 1, readfds);
03330 r = rb_fd_ptr(readfds);
03331 }
03332 if (writefds) {
03333 rb_fd_resize(n - 1, writefds);
03334 w = rb_fd_ptr(writefds);
03335 }
03336 if (exceptfds) {
03337 rb_fd_resize(n - 1, exceptfds);
03338 e = rb_fd_ptr(exceptfds);
03339 }
03340 return select(n, r, w, e, timeout);
03341 }
03342
03343 #undef FD_ZERO
03344 #undef FD_SET
03345 #undef FD_CLR
03346 #undef FD_ISSET
03347
03348 #define FD_ZERO(f) rb_fd_zero(f)
03349 #define FD_SET(i, f) rb_fd_set((i), (f))
03350 #define FD_CLR(i, f) rb_fd_clr((i), (f))
03351 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
03352
03353 #elif defined(_WIN32)
03354
03355 void
03356 rb_fd_init(rb_fdset_t *set)
03357 {
03358 set->capa = FD_SETSIZE;
03359 set->fdset = ALLOC(fd_set);
03360 FD_ZERO(set->fdset);
03361 }
03362
03363 void
03364 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
03365 {
03366 rb_fd_init(dst);
03367 rb_fd_dup(dst, src);
03368 }
03369
03370 static void
03371 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src)
03372 {
03373 int max = rb_fd_max(src);
03374
03375
03376
03377 if (max > FD_SETSIZE || (UINT)max > dst->fd_count) {
03378 rb_raise(rb_eArgError, "too large fdsets");
03379 }
03380
03381 memcpy(dst->fd_array, src->fdset->fd_array, max);
03382 dst->fd_count = max;
03383 }
03384
03385 void
03386 rb_fd_term(rb_fdset_t *set)
03387 {
03388 xfree(set->fdset);
03389 set->fdset = NULL;
03390 set->capa = 0;
03391 }
03392
03393 void
03394 rb_fd_set(int fd, rb_fdset_t *set)
03395 {
03396 unsigned int i;
03397 SOCKET s = rb_w32_get_osfhandle(fd);
03398
03399 for (i = 0; i < set->fdset->fd_count; i++) {
03400 if (set->fdset->fd_array[i] == s) {
03401 return;
03402 }
03403 }
03404 if (set->fdset->fd_count >= (unsigned)set->capa) {
03405 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
03406 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa);
03407 }
03408 set->fdset->fd_array[set->fdset->fd_count++] = s;
03409 }
03410
03411 #undef FD_ZERO
03412 #undef FD_SET
03413 #undef FD_CLR
03414 #undef FD_ISSET
03415
03416 #define FD_ZERO(f) rb_fd_zero(f)
03417 #define FD_SET(i, f) rb_fd_set((i), (f))
03418 #define FD_CLR(i, f) rb_fd_clr((i), (f))
03419 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
03420
03421 #else
03422 #define rb_fd_rcopy(d, s) (*(d) = *(s))
03423 #endif
03424
03425 static int
03426 do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
03427 struct timeval *timeout)
03428 {
03429 int UNINITIALIZED_VAR(result);
03430 int lerrno;
03431 rb_fdset_t UNINITIALIZED_VAR(orig_read);
03432 rb_fdset_t UNINITIALIZED_VAR(orig_write);
03433 rb_fdset_t UNINITIALIZED_VAR(orig_except);
03434 double limit = 0;
03435 struct timeval wait_rest;
03436 rb_thread_t *th = GET_THREAD();
03437
03438 if (timeout) {
03439 limit = timeofday();
03440 limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6;
03441 wait_rest = *timeout;
03442 timeout = &wait_rest;
03443 }
03444
03445 if (read)
03446 rb_fd_init_copy(&orig_read, read);
03447 if (write)
03448 rb_fd_init_copy(&orig_write, write);
03449 if (except)
03450 rb_fd_init_copy(&orig_except, except);
03451
03452 retry:
03453 lerrno = 0;
03454
03455 BLOCKING_REGION({
03456 result = native_fd_select(n, read, write, except, timeout, th);
03457 if (result < 0) lerrno = errno;
03458 }, ubf_select, th, FALSE);
03459
03460 RUBY_VM_CHECK_INTS_BLOCKING(th);
03461
03462 errno = lerrno;
03463
03464 if (result < 0) {
03465 switch (errno) {
03466 case EINTR:
03467 #ifdef ERESTART
03468 case ERESTART:
03469 #endif
03470 if (read)
03471 rb_fd_dup(read, &orig_read);
03472 if (write)
03473 rb_fd_dup(write, &orig_write);
03474 if (except)
03475 rb_fd_dup(except, &orig_except);
03476
03477 if (timeout) {
03478 double d = limit - timeofday();
03479
03480 wait_rest.tv_sec = (time_t)d;
03481 wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6);
03482 if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0;
03483 if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0;
03484 }
03485
03486 goto retry;
03487 default:
03488 break;
03489 }
03490 }
03491
03492 if (read)
03493 rb_fd_term(&orig_read);
03494 if (write)
03495 rb_fd_term(&orig_write);
03496 if (except)
03497 rb_fd_term(&orig_except);
03498
03499 return result;
03500 }
03501
03502 static void
03503 rb_thread_wait_fd_rw(int fd, int read)
03504 {
03505 int result = 0;
03506 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT;
03507
03508 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write");
03509
03510 if (fd < 0) {
03511 rb_raise(rb_eIOError, "closed stream");
03512 }
03513
03514 result = rb_wait_for_single_fd(fd, events, NULL);
03515 if (result < 0) {
03516 rb_sys_fail(0);
03517 }
03518
03519 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write");
03520 }
03521
03522 void
03523 rb_thread_wait_fd(int fd)
03524 {
03525 rb_thread_wait_fd_rw(fd, 1);
03526 }
03527
03528 int
03529 rb_thread_fd_writable(int fd)
03530 {
03531 rb_thread_wait_fd_rw(fd, 0);
03532 return TRUE;
03533 }
03534
03535 int
03536 rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except,
03537 struct timeval *timeout)
03538 {
03539 rb_fdset_t fdsets[3];
03540 rb_fdset_t *rfds = NULL;
03541 rb_fdset_t *wfds = NULL;
03542 rb_fdset_t *efds = NULL;
03543 int retval;
03544
03545 if (read) {
03546 rfds = &fdsets[0];
03547 rb_fd_init(rfds);
03548 rb_fd_copy(rfds, read, max);
03549 }
03550 if (write) {
03551 wfds = &fdsets[1];
03552 rb_fd_init(wfds);
03553 rb_fd_copy(wfds, write, max);
03554 }
03555 if (except) {
03556 efds = &fdsets[2];
03557 rb_fd_init(efds);
03558 rb_fd_copy(efds, except, max);
03559 }
03560
03561 retval = rb_thread_fd_select(max, rfds, wfds, efds, timeout);
03562
03563 if (rfds) {
03564 rb_fd_rcopy(read, rfds);
03565 rb_fd_term(rfds);
03566 }
03567 if (wfds) {
03568 rb_fd_rcopy(write, wfds);
03569 rb_fd_term(wfds);
03570 }
03571 if (efds) {
03572 rb_fd_rcopy(except, efds);
03573 rb_fd_term(efds);
03574 }
03575
03576 return retval;
03577 }
03578
03579 int
03580 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
03581 struct timeval *timeout)
03582 {
03583 if (!read && !write && !except) {
03584 if (!timeout) {
03585 rb_thread_sleep_forever();
03586 return 0;
03587 }
03588 rb_thread_wait_for(*timeout);
03589 return 0;
03590 }
03591
03592 if (read) {
03593 rb_fd_resize(max - 1, read);
03594 }
03595 if (write) {
03596 rb_fd_resize(max - 1, write);
03597 }
03598 if (except) {
03599 rb_fd_resize(max - 1, except);
03600 }
03601 return do_select(max, read, write, except, timeout);
03602 }
03603
03604
03605
03606
03607
03608
03609 #if defined(HAVE_POLL) && defined(__linux__)
03610 # define USE_POLL
03611 #endif
03612
03613 #ifdef USE_POLL
03614
03615
03616 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
03617 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
03618 #define POLLEX_SET (POLLPRI)
03619
03620 #ifndef HAVE_PPOLL
03621
03622 int
03623 ppoll(struct pollfd *fds, nfds_t nfds,
03624 const struct timespec *ts, const sigset_t *sigmask)
03625 {
03626 int timeout_ms;
03627
03628 if (ts) {
03629 int tmp, tmp2;
03630
03631 if (ts->tv_sec > TIMET_MAX/1000)
03632 timeout_ms = -1;
03633 else {
03634 tmp = ts->tv_sec * 1000;
03635 tmp2 = ts->tv_nsec / (1000 * 1000);
03636 if (TIMET_MAX - tmp < tmp2)
03637 timeout_ms = -1;
03638 else
03639 timeout_ms = tmp + tmp2;
03640 }
03641 }
03642 else
03643 timeout_ms = -1;
03644
03645 return poll(fds, nfds, timeout_ms);
03646 }
03647 #endif
03648
03649
03650
03651
03652 int
03653 rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
03654 {
03655 struct pollfd fds;
03656 int result = 0, lerrno;
03657 double limit = 0;
03658 struct timespec ts;
03659 struct timespec *timeout = NULL;
03660 rb_thread_t *th = GET_THREAD();
03661
03662 if (tv) {
03663 ts.tv_sec = tv->tv_sec;
03664 ts.tv_nsec = tv->tv_usec * 1000;
03665 limit = timeofday();
03666 limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
03667 timeout = &ts;
03668 }
03669
03670 fds.fd = fd;
03671 fds.events = (short)events;
03672
03673 retry:
03674 lerrno = 0;
03675 BLOCKING_REGION({
03676 result = ppoll(&fds, 1, timeout, NULL);
03677 if (result < 0) lerrno = errno;
03678 }, ubf_select, th, FALSE);
03679
03680 RUBY_VM_CHECK_INTS_BLOCKING(th);
03681
03682 if (result < 0) {
03683 errno = lerrno;
03684 switch (errno) {
03685 case EINTR:
03686 #ifdef ERESTART
03687 case ERESTART:
03688 #endif
03689 if (timeout) {
03690 double d = limit - timeofday();
03691
03692 ts.tv_sec = (long)d;
03693 ts.tv_nsec = (long)((d - (double)ts.tv_sec) * 1e9);
03694 if (ts.tv_sec < 0)
03695 ts.tv_sec = 0;
03696 if (ts.tv_nsec < 0)
03697 ts.tv_nsec = 0;
03698 }
03699 goto retry;
03700 }
03701 return -1;
03702 }
03703
03704 if (fds.revents & POLLNVAL) {
03705 errno = EBADF;
03706 return -1;
03707 }
03708
03709
03710
03711
03712
03713 result = 0;
03714 if (fds.revents & POLLIN_SET)
03715 result |= RB_WAITFD_IN;
03716 if (fds.revents & POLLOUT_SET)
03717 result |= RB_WAITFD_OUT;
03718 if (fds.revents & POLLEX_SET)
03719 result |= RB_WAITFD_PRI;
03720
03721 return result;
03722 }
03723 #else
03724 static rb_fdset_t *
03725 init_set_fd(int fd, rb_fdset_t *fds)
03726 {
03727 rb_fd_init(fds);
03728 rb_fd_set(fd, fds);
03729
03730 return fds;
03731 }
03732
03733 struct select_args {
03734 union {
03735 int fd;
03736 int error;
03737 } as;
03738 rb_fdset_t *read;
03739 rb_fdset_t *write;
03740 rb_fdset_t *except;
03741 struct timeval *tv;
03742 };
03743
03744 static VALUE
03745 select_single(VALUE ptr)
03746 {
03747 struct select_args *args = (struct select_args *)ptr;
03748 int r;
03749
03750 r = rb_thread_fd_select(args->as.fd + 1,
03751 args->read, args->write, args->except, args->tv);
03752 if (r == -1)
03753 args->as.error = errno;
03754 if (r > 0) {
03755 r = 0;
03756 if (args->read && rb_fd_isset(args->as.fd, args->read))
03757 r |= RB_WAITFD_IN;
03758 if (args->write && rb_fd_isset(args->as.fd, args->write))
03759 r |= RB_WAITFD_OUT;
03760 if (args->except && rb_fd_isset(args->as.fd, args->except))
03761 r |= RB_WAITFD_PRI;
03762 }
03763 return (VALUE)r;
03764 }
03765
03766 static VALUE
03767 select_single_cleanup(VALUE ptr)
03768 {
03769 struct select_args *args = (struct select_args *)ptr;
03770
03771 if (args->read) rb_fd_term(args->read);
03772 if (args->write) rb_fd_term(args->write);
03773 if (args->except) rb_fd_term(args->except);
03774
03775 return (VALUE)-1;
03776 }
03777
03778 int
03779 rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
03780 {
03781 rb_fdset_t rfds, wfds, efds;
03782 struct select_args args;
03783 int r;
03784 VALUE ptr = (VALUE)&args;
03785
03786 args.as.fd = fd;
03787 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
03788 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
03789 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
03790 args.tv = tv;
03791
03792 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
03793 if (r == -1)
03794 errno = args.as.error;
03795
03796 return r;
03797 }
03798 #endif
03799
03800
03801
03802
03803
03804 #ifdef USE_CONSERVATIVE_STACK_END
03805 void
03806 rb_gc_set_stack_end(VALUE **stack_end_p)
03807 {
03808 VALUE stack_end;
03809 *stack_end_p = &stack_end;
03810 }
03811 #endif
03812
03813
03814
03815
03816
03817
03818 void
03819 rb_threadptr_check_signal(rb_thread_t *mth)
03820 {
03821
03822 if (rb_signal_buff_size() > 0) {
03823
03824 rb_threadptr_trap_interrupt(mth);
03825 }
03826 }
03827
03828 static void
03829 timer_thread_function(void *arg)
03830 {
03831 rb_vm_t *vm = GET_VM();
03832
03833
03834
03835
03836
03837
03838 native_mutex_lock(&vm->thread_destruct_lock);
03839
03840 if (vm->running_thread)
03841 RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread);
03842 native_mutex_unlock(&vm->thread_destruct_lock);
03843
03844
03845 rb_threadptr_check_signal(vm->main_thread);
03846
03847 #if 0
03848
03849 if (vm->prove_profile.enable) {
03850 rb_thread_t *th = vm->running_thread;
03851
03852 if (vm->during_gc) {
03853
03854 }
03855 }
03856 #endif
03857 }
03858
03859 void
03860 rb_thread_stop_timer_thread(int close_anyway)
03861 {
03862 if (timer_thread_id && native_stop_timer_thread(close_anyway)) {
03863 native_reset_timer_thread();
03864 }
03865 }
03866
03867 void
03868 rb_thread_reset_timer_thread(void)
03869 {
03870 native_reset_timer_thread();
03871 }
03872
03873 void
03874 rb_thread_start_timer_thread(void)
03875 {
03876 system_working = 1;
03877 rb_thread_create_timer_thread();
03878 }
03879
03880 static int
03881 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
03882 {
03883 int i;
03884 VALUE lines = (VALUE)val;
03885
03886 for (i = 0; i < RARRAY_LEN(lines); i++) {
03887 if (RARRAY_AREF(lines, i) != Qnil) {
03888 RARRAY_ASET(lines, i, INT2FIX(0));
03889 }
03890 }
03891 return ST_CONTINUE;
03892 }
03893
03894 static void
03895 clear_coverage(void)
03896 {
03897 VALUE coverages = rb_get_coverages();
03898 if (RTEST(coverages)) {
03899 st_foreach(rb_hash_tbl_raw(coverages), clear_coverage_i, 0);
03900 }
03901 }
03902
03903 static void
03904 rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t))
03905 {
03906 rb_thread_t *th = GET_THREAD();
03907 rb_vm_t *vm = th->vm;
03908 VALUE thval = th->self;
03909 vm->main_thread = th;
03910
03911 gvl_atfork(th->vm);
03912 st_foreach(vm->living_threads, atfork, (st_data_t)th);
03913 st_clear(vm->living_threads);
03914 st_insert(vm->living_threads, thval, (st_data_t)th->thread_id);
03915 vm->sleeper = 0;
03916 clear_coverage();
03917 }
03918
03919 static int
03920 terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th)
03921 {
03922 VALUE thval = key;
03923 rb_thread_t *th;
03924 GetThreadPtr(thval, th);
03925
03926 if (th != (rb_thread_t *)current_th) {
03927 rb_mutex_abandon_keeping_mutexes(th);
03928 rb_mutex_abandon_locking_mutex(th);
03929 thread_cleanup_func(th, TRUE);
03930 }
03931 return ST_CONTINUE;
03932 }
03933
03934 void
03935 rb_thread_atfork(void)
03936 {
03937 rb_thread_atfork_internal(terminate_atfork_i);
03938 GET_THREAD()->join_list = NULL;
03939
03940
03941 rb_reset_random_seed();
03942 }
03943
03944 static int
03945 terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th)
03946 {
03947 VALUE thval = key;
03948 rb_thread_t *th;
03949 GetThreadPtr(thval, th);
03950
03951 if (th != (rb_thread_t *)current_th) {
03952 thread_cleanup_func_before_exec(th);
03953 }
03954 return ST_CONTINUE;
03955 }
03956
03957 void
03958 rb_thread_atfork_before_exec(void)
03959 {
03960 rb_thread_atfork_internal(terminate_atfork_before_exec_i);
03961 }
03962
03963 struct thgroup {
03964 int enclosed;
03965 VALUE group;
03966 };
03967
03968 static size_t
03969 thgroup_memsize(const void *ptr)
03970 {
03971 return ptr ? sizeof(struct thgroup) : 0;
03972 }
03973
03974 static const rb_data_type_t thgroup_data_type = {
03975 "thgroup",
03976 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,},
03977 NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY
03978 };
03979
03980
03981
03982
03983
03984
03985
03986
03987
03988
03989
03990
03991
03992
03993
03994
03995
03996
03997
03998
03999 static VALUE
04000 thgroup_s_alloc(VALUE klass)
04001 {
04002 VALUE group;
04003 struct thgroup *data;
04004
04005 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data);
04006 data->enclosed = 0;
04007 data->group = group;
04008
04009 return group;
04010 }
04011
04012 struct thgroup_list_params {
04013 VALUE ary;
04014 VALUE group;
04015 };
04016
04017 static int
04018 thgroup_list_i(st_data_t key, st_data_t val, st_data_t data)
04019 {
04020 VALUE thread = (VALUE)key;
04021 VALUE ary = ((struct thgroup_list_params *)data)->ary;
04022 VALUE group = ((struct thgroup_list_params *)data)->group;
04023 rb_thread_t *th;
04024 GetThreadPtr(thread, th);
04025
04026 if (th->thgroup == group) {
04027 rb_ary_push(ary, thread);
04028 }
04029 return ST_CONTINUE;
04030 }
04031
04032
04033
04034
04035
04036
04037
04038
04039
04040
04041 static VALUE
04042 thgroup_list(VALUE group)
04043 {
04044 VALUE ary = rb_ary_new();
04045 struct thgroup_list_params param;
04046
04047 param.ary = ary;
04048 param.group = group;
04049 st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param);
04050 return ary;
04051 }
04052
04053
04054
04055
04056
04057
04058
04059
04060
04061
04062
04063
04064
04065
04066
04067
04068
04069
04070 static VALUE
04071 thgroup_enclose(VALUE group)
04072 {
04073 struct thgroup *data;
04074
04075 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
04076 data->enclosed = 1;
04077
04078 return group;
04079 }
04080
04081
04082
04083
04084
04085
04086
04087
04088
04089 static VALUE
04090 thgroup_enclosed_p(VALUE group)
04091 {
04092 struct thgroup *data;
04093
04094 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
04095 if (data->enclosed)
04096 return Qtrue;
04097 return Qfalse;
04098 }
04099
04100
04101
04102
04103
04104
04105
04106
04107
04108
04109
04110
04111
04112
04113
04114
04115
04116
04117
04118
04119
04120
04121
04122
04123
04124
04125
04126
04127 static VALUE
04128 thgroup_add(VALUE group, VALUE thread)
04129 {
04130 rb_thread_t *th;
04131 struct thgroup *data;
04132
04133 GetThreadPtr(thread, th);
04134
04135 if (OBJ_FROZEN(group)) {
04136 rb_raise(rb_eThreadError, "can't move to the frozen thread group");
04137 }
04138 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
04139 if (data->enclosed) {
04140 rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
04141 }
04142
04143 if (!th->thgroup) {
04144 return Qnil;
04145 }
04146
04147 if (OBJ_FROZEN(th->thgroup)) {
04148 rb_raise(rb_eThreadError, "can't move from the frozen thread group");
04149 }
04150 TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data);
04151 if (data->enclosed) {
04152 rb_raise(rb_eThreadError,
04153 "can't move from the enclosed thread group");
04154 }
04155
04156 th->thgroup = group;
04157 return group;
04158 }
04159
04160
04161
04162
04163
04164
04165
04166
04167
04168
04169
04170
04171
04172
04173
04174
04175
04176
04177
04178
04179
04180
04181
04182
04183
04184
04185
04186 #define GetMutexPtr(obj, tobj) \
04187 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj))
04188
04189 #define mutex_mark NULL
04190
04191 static void
04192 mutex_free(void *ptr)
04193 {
04194 if (ptr) {
04195 rb_mutex_t *mutex = ptr;
04196 if (mutex->th) {
04197
04198 const char *err = rb_mutex_unlock_th(mutex, mutex->th);
04199 if (err) rb_bug("%s", err);
04200 }
04201 native_mutex_destroy(&mutex->lock);
04202 native_cond_destroy(&mutex->cond);
04203 }
04204 ruby_xfree(ptr);
04205 }
04206
04207 static size_t
04208 mutex_memsize(const void *ptr)
04209 {
04210 return ptr ? sizeof(rb_mutex_t) : 0;
04211 }
04212
04213 static const rb_data_type_t mutex_data_type = {
04214 "mutex",
04215 {mutex_mark, mutex_free, mutex_memsize,},
04216 NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY
04217 };
04218
04219 VALUE
04220 rb_obj_is_mutex(VALUE obj)
04221 {
04222 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) {
04223 return Qtrue;
04224 }
04225 else {
04226 return Qfalse;
04227 }
04228 }
04229
04230 static VALUE
04231 mutex_alloc(VALUE klass)
04232 {
04233 VALUE volatile obj;
04234 rb_mutex_t *mutex;
04235
04236 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
04237 native_mutex_initialize(&mutex->lock);
04238 native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC);
04239 return obj;
04240 }
04241
04242
04243
04244
04245
04246
04247
04248 static VALUE
04249 mutex_initialize(VALUE self)
04250 {
04251 return self;
04252 }
04253
04254 VALUE
04255 rb_mutex_new(void)
04256 {
04257 return mutex_alloc(rb_cMutex);
04258 }
04259
04260
04261
04262
04263
04264
04265
04266 VALUE
04267 rb_mutex_locked_p(VALUE self)
04268 {
04269 rb_mutex_t *mutex;
04270 GetMutexPtr(self, mutex);
04271 return mutex->th ? Qtrue : Qfalse;
04272 }
04273
04274 static void
04275 mutex_locked(rb_thread_t *th, VALUE self)
04276 {
04277 rb_mutex_t *mutex;
04278 GetMutexPtr(self, mutex);
04279
04280 if (th->keeping_mutexes) {
04281 mutex->next_mutex = th->keeping_mutexes;
04282 }
04283 th->keeping_mutexes = mutex;
04284 }
04285
04286
04287
04288
04289
04290
04291
04292
04293 VALUE
04294 rb_mutex_trylock(VALUE self)
04295 {
04296 rb_mutex_t *mutex;
04297 VALUE locked = Qfalse;
04298 GetMutexPtr(self, mutex);
04299
04300 native_mutex_lock(&mutex->lock);
04301 if (mutex->th == 0) {
04302 mutex->th = GET_THREAD();
04303 locked = Qtrue;
04304
04305 mutex_locked(GET_THREAD(), self);
04306 }
04307 native_mutex_unlock(&mutex->lock);
04308
04309 return locked;
04310 }
04311
04312 static int
04313 lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms)
04314 {
04315 int interrupted = 0;
04316 int err = 0;
04317
04318 mutex->cond_waiting++;
04319 for (;;) {
04320 if (!mutex->th) {
04321 mutex->th = th;
04322 break;
04323 }
04324 if (RUBY_VM_INTERRUPTED(th)) {
04325 interrupted = 1;
04326 break;
04327 }
04328 if (err == ETIMEDOUT) {
04329 interrupted = 2;
04330 break;
04331 }
04332
04333 if (timeout_ms) {
04334 struct timespec timeout_rel;
04335 struct timespec timeout;
04336
04337 timeout_rel.tv_sec = 0;
04338 timeout_rel.tv_nsec = timeout_ms * 1000 * 1000;
04339 timeout = native_cond_timeout(&mutex->cond, timeout_rel);
04340 err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout);
04341 }
04342 else {
04343 native_cond_wait(&mutex->cond, &mutex->lock);
04344 err = 0;
04345 }
04346 }
04347 mutex->cond_waiting--;
04348
04349 return interrupted;
04350 }
04351
04352 static void
04353 lock_interrupt(void *ptr)
04354 {
04355 rb_mutex_t *mutex = (rb_mutex_t *)ptr;
04356 native_mutex_lock(&mutex->lock);
04357 if (mutex->cond_waiting > 0)
04358 native_cond_broadcast(&mutex->cond);
04359 native_mutex_unlock(&mutex->lock);
04360 }
04361
04362
04363
04364
04365
04366
04367 static const rb_thread_t *patrol_thread = NULL;
04368
04369
04370
04371
04372
04373
04374
04375
04376 VALUE
04377 rb_mutex_lock(VALUE self)
04378 {
04379 rb_thread_t *th = GET_THREAD();
04380 rb_mutex_t *mutex;
04381 GetMutexPtr(self, mutex);
04382
04383
04384 if (!mutex->allow_trap && th->interrupt_mask & TRAP_INTERRUPT_MASK) {
04385 rb_raise(rb_eThreadError, "can't be called from trap context");
04386 }
04387
04388 if (rb_mutex_trylock(self) == Qfalse) {
04389 if (mutex->th == GET_THREAD()) {
04390 rb_raise(rb_eThreadError, "deadlock; recursive locking");
04391 }
04392
04393 while (mutex->th != th) {
04394 int interrupted;
04395 enum rb_thread_status prev_status = th->status;
04396 volatile int timeout_ms = 0;
04397 struct rb_unblock_callback oldubf;
04398
04399 set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE);
04400 th->status = THREAD_STOPPED_FOREVER;
04401 th->locking_mutex = self;
04402
04403 native_mutex_lock(&mutex->lock);
04404 th->vm->sleeper++;
04405
04406
04407
04408
04409
04410 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
04411 !patrol_thread) {
04412 timeout_ms = 100;
04413 patrol_thread = th;
04414 }
04415
04416 GVL_UNLOCK_BEGIN();
04417 interrupted = lock_func(th, mutex, (int)timeout_ms);
04418 native_mutex_unlock(&mutex->lock);
04419 GVL_UNLOCK_END();
04420
04421 if (patrol_thread == th)
04422 patrol_thread = NULL;
04423
04424 reset_unblock_function(th, &oldubf);
04425
04426 th->locking_mutex = Qfalse;
04427 if (mutex->th && interrupted == 2) {
04428 rb_check_deadlock(th->vm);
04429 }
04430 if (th->status == THREAD_STOPPED_FOREVER) {
04431 th->status = prev_status;
04432 }
04433 th->vm->sleeper--;
04434
04435 if (mutex->th == th) mutex_locked(th, self);
04436
04437 if (interrupted) {
04438 RUBY_VM_CHECK_INTS_BLOCKING(th);
04439 }
04440 }
04441 }
04442 return self;
04443 }
04444
04445
04446
04447
04448
04449
04450
04451
04452 VALUE
04453 rb_mutex_owned_p(VALUE self)
04454 {
04455 VALUE owned = Qfalse;
04456 rb_thread_t *th = GET_THREAD();
04457 rb_mutex_t *mutex;
04458
04459 GetMutexPtr(self, mutex);
04460
04461 if (mutex->th == th)
04462 owned = Qtrue;
04463
04464 return owned;
04465 }
04466
04467 static const char *
04468 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th)
04469 {
04470 const char *err = NULL;
04471
04472 native_mutex_lock(&mutex->lock);
04473
04474 if (mutex->th == 0) {
04475 err = "Attempt to unlock a mutex which is not locked";
04476 }
04477 else if (mutex->th != th) {
04478 err = "Attempt to unlock a mutex which is locked by another thread";
04479 }
04480 else {
04481 mutex->th = 0;
04482 if (mutex->cond_waiting > 0)
04483 native_cond_signal(&mutex->cond);
04484 }
04485
04486 native_mutex_unlock(&mutex->lock);
04487
04488 if (!err) {
04489 rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes;
04490 while (*th_mutex != mutex) {
04491 th_mutex = &(*th_mutex)->next_mutex;
04492 }
04493 *th_mutex = mutex->next_mutex;
04494 mutex->next_mutex = NULL;
04495 }
04496
04497 return err;
04498 }
04499
04500
04501
04502
04503
04504
04505
04506
04507 VALUE
04508 rb_mutex_unlock(VALUE self)
04509 {
04510 const char *err;
04511 rb_mutex_t *mutex;
04512 GetMutexPtr(self, mutex);
04513
04514 err = rb_mutex_unlock_th(mutex, GET_THREAD());
04515 if (err) rb_raise(rb_eThreadError, "%s", err);
04516
04517 return self;
04518 }
04519
04520 static void
04521 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
04522 {
04523 if (th->keeping_mutexes) {
04524 rb_mutex_abandon_all(th->keeping_mutexes);
04525 }
04526 th->keeping_mutexes = NULL;
04527 }
04528
04529 static void
04530 rb_mutex_abandon_locking_mutex(rb_thread_t *th)
04531 {
04532 rb_mutex_t *mutex;
04533
04534 if (!th->locking_mutex) return;
04535
04536 GetMutexPtr(th->locking_mutex, mutex);
04537 if (mutex->th == th)
04538 rb_mutex_abandon_all(mutex);
04539 th->locking_mutex = Qfalse;
04540 }
04541
04542 static void
04543 rb_mutex_abandon_all(rb_mutex_t *mutexes)
04544 {
04545 rb_mutex_t *mutex;
04546
04547 while (mutexes) {
04548 mutex = mutexes;
04549 mutexes = mutex->next_mutex;
04550 mutex->th = 0;
04551 mutex->next_mutex = 0;
04552 }
04553 }
04554
04555 static VALUE
04556 rb_mutex_sleep_forever(VALUE time)
04557 {
04558 sleep_forever(GET_THREAD(), 1, 0);
04559 return Qnil;
04560 }
04561
04562 static VALUE
04563 rb_mutex_wait_for(VALUE time)
04564 {
04565 struct timeval *t = (struct timeval *)time;
04566 sleep_timeval(GET_THREAD(), *t, 0);
04567 return Qnil;
04568 }
04569
04570 VALUE
04571 rb_mutex_sleep(VALUE self, VALUE timeout)
04572 {
04573 time_t beg, end;
04574 struct timeval t;
04575
04576 if (!NIL_P(timeout)) {
04577 t = rb_time_interval(timeout);
04578 }
04579 rb_mutex_unlock(self);
04580 beg = time(0);
04581 if (NIL_P(timeout)) {
04582 rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self);
04583 }
04584 else {
04585 rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self);
04586 }
04587 end = time(0) - beg;
04588 return INT2FIX(end);
04589 }
04590
04591
04592
04593
04594
04595
04596
04597
04598
04599
04600
04601
04602
04603
04604
04605 static VALUE
04606 mutex_sleep(int argc, VALUE *argv, VALUE self)
04607 {
04608 VALUE timeout;
04609
04610 rb_scan_args(argc, argv, "01", &timeout);
04611 return rb_mutex_sleep(self, timeout);
04612 }
04613
04614
04615
04616
04617
04618
04619
04620
04621
04622 VALUE
04623 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
04624 {
04625 rb_mutex_lock(mutex);
04626 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
04627 }
04628
04629
04630
04631
04632
04633
04634
04635
04636 static VALUE
04637 rb_mutex_synchronize_m(VALUE self, VALUE args)
04638 {
04639 if (!rb_block_given_p()) {
04640 rb_raise(rb_eThreadError, "must be called with a block");
04641 }
04642
04643 return rb_mutex_synchronize(self, rb_yield, Qundef);
04644 }
04645
04646 void rb_mutex_allow_trap(VALUE self, int val)
04647 {
04648 rb_mutex_t *m;
04649 GetMutexPtr(self, m);
04650
04651 m->allow_trap = val;
04652 }
04653
04654
04655
04656
04657 static void
04658 thread_shield_mark(void *ptr)
04659 {
04660 rb_gc_mark((VALUE)ptr);
04661 }
04662
04663 static const rb_data_type_t thread_shield_data_type = {
04664 "thread_shield",
04665 {thread_shield_mark, 0, 0,},
04666 NULL, NULL, RUBY_TYPED_FREE_IMMEDIATELY
04667 };
04668
04669 static VALUE
04670 thread_shield_alloc(VALUE klass)
04671 {
04672 return TypedData_Wrap_Struct(klass, &thread_shield_data_type, (void *)mutex_alloc(0));
04673 }
04674
04675 #define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type))
04676 #define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19)
04677 #define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT)
04678 #define rb_thread_shield_waiting(b) (int)((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT)
04679
04680 static inline void
04681 rb_thread_shield_waiting_inc(VALUE b)
04682 {
04683 unsigned int w = rb_thread_shield_waiting(b);
04684 w++;
04685 if (w > (unsigned int)(THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT))
04686 rb_raise(rb_eRuntimeError, "waiting count overflow");
04687 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
04688 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
04689 }
04690
04691 static inline void
04692 rb_thread_shield_waiting_dec(VALUE b)
04693 {
04694 unsigned int w = rb_thread_shield_waiting(b);
04695 if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow");
04696 w--;
04697 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
04698 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
04699 }
04700
04701 VALUE
04702 rb_thread_shield_new(void)
04703 {
04704 VALUE thread_shield = thread_shield_alloc(rb_cThreadShield);
04705 rb_mutex_lock((VALUE)DATA_PTR(thread_shield));
04706 return thread_shield;
04707 }
04708
04709
04710
04711
04712
04713
04714
04715
04716
04717 VALUE
04718 rb_thread_shield_wait(VALUE self)
04719 {
04720 VALUE mutex = GetThreadShieldPtr(self);
04721 rb_mutex_t *m;
04722
04723 if (!mutex) return Qfalse;
04724 GetMutexPtr(mutex, m);
04725 if (m->th == GET_THREAD()) return Qnil;
04726 rb_thread_shield_waiting_inc(self);
04727 rb_mutex_lock(mutex);
04728 rb_thread_shield_waiting_dec(self);
04729 if (DATA_PTR(self)) return Qtrue;
04730 rb_mutex_unlock(mutex);
04731 return rb_thread_shield_waiting(self) > 0 ? Qnil : Qfalse;
04732 }
04733
04734
04735
04736
04737 VALUE
04738 rb_thread_shield_release(VALUE self)
04739 {
04740 VALUE mutex = GetThreadShieldPtr(self);
04741 rb_mutex_unlock(mutex);
04742 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
04743 }
04744
04745
04746
04747
04748 VALUE
04749 rb_thread_shield_destroy(VALUE self)
04750 {
04751 VALUE mutex = GetThreadShieldPtr(self);
04752 DATA_PTR(self) = 0;
04753 rb_mutex_unlock(mutex);
04754 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
04755 }
04756
04757
04758 static ID recursive_key;
04759
04760 extern const struct st_hash_type st_hashtype_num;
04761
04762 static VALUE
04763 ident_hash_new(void)
04764 {
04765 VALUE hash = rb_hash_new();
04766 rb_hash_tbl_raw(hash)->type = &st_hashtype_num;
04767 return hash;
04768 }
04769
04770 ID rb_frame_last_func(void);
04771
04772
04773
04774
04775
04776
04777
04778 static VALUE
04779 recursive_list_access(VALUE sym)
04780 {
04781 volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key);
04782 VALUE list;
04783 if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) {
04784 hash = ident_hash_new();
04785 rb_thread_local_aset(rb_thread_current(), recursive_key, hash);
04786 list = Qnil;
04787 }
04788 else {
04789 list = rb_hash_aref(hash, sym);
04790 }
04791 if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) {
04792 list = ident_hash_new();
04793 rb_hash_aset(hash, sym, list);
04794 }
04795 return list;
04796 }
04797
04798 VALUE
04799 rb_threadptr_reset_recursive_data(rb_thread_t *th)
04800 {
04801 VALUE old = threadptr_local_aref(th, recursive_key);
04802 threadptr_local_aset(th, recursive_key, Qnil);
04803 return old;
04804 }
04805
04806 void
04807 rb_threadptr_restore_recursive_data(rb_thread_t *th, VALUE old)
04808 {
04809 threadptr_local_aset(th, recursive_key, old);
04810 }
04811
04812
04813
04814
04815
04816
04817
04818 static VALUE
04819 recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id)
04820 {
04821 #if SIZEOF_LONG == SIZEOF_VOIDP
04822 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other))
04823 #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP
04824 #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \
04825 rb_big_eql((obj_id), (other)) : ((obj_id) == (other)))
04826 #endif
04827
04828 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef);
04829 if (pair_list == Qundef)
04830 return Qfalse;
04831 if (paired_obj_id) {
04832 if (!RB_TYPE_P(pair_list, T_HASH)) {
04833 if (!OBJ_ID_EQL(paired_obj_id, pair_list))
04834 return Qfalse;
04835 }
04836 else {
04837 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
04838 return Qfalse;
04839 }
04840 }
04841 return Qtrue;
04842 }
04843
04844
04845
04846
04847
04848
04849
04850
04851
04852
04853 static void
04854 recursive_push(VALUE list, VALUE obj, VALUE paired_obj)
04855 {
04856 VALUE pair_list;
04857
04858 if (!paired_obj) {
04859 rb_hash_aset(list, obj, Qtrue);
04860 }
04861 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) {
04862 rb_hash_aset(list, obj, paired_obj);
04863 }
04864 else {
04865 if (!RB_TYPE_P(pair_list, T_HASH)){
04866 VALUE other_paired_obj = pair_list;
04867 pair_list = rb_hash_new();
04868 rb_hash_aset(pair_list, other_paired_obj, Qtrue);
04869 rb_hash_aset(list, obj, pair_list);
04870 }
04871 rb_hash_aset(pair_list, paired_obj, Qtrue);
04872 }
04873 }
04874
04875
04876
04877
04878
04879
04880
04881
04882
04883 static int
04884 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj)
04885 {
04886 if (paired_obj) {
04887 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
04888 if (pair_list == Qundef) {
04889 return 0;
04890 }
04891 if (RB_TYPE_P(pair_list, T_HASH)) {
04892 rb_hash_delete(pair_list, paired_obj);
04893 if (!RHASH_EMPTY_P(pair_list)) {
04894 return 1;
04895 }
04896 }
04897 }
04898 rb_hash_delete(list, obj);
04899 return 1;
04900 }
04901
04902 struct exec_recursive_params {
04903 VALUE (*func) (VALUE, VALUE, int);
04904 VALUE list;
04905 VALUE obj;
04906 VALUE objid;
04907 VALUE pairid;
04908 VALUE arg;
04909 };
04910
04911 static VALUE
04912 exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data))
04913 {
04914 struct exec_recursive_params *p = (void *)data;
04915 return (*p->func)(p->obj, p->arg, FALSE);
04916 }
04917
04918
04919
04920
04921
04922
04923
04924
04925
04926
04927
04928
04929 static VALUE
04930 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer)
04931 {
04932 VALUE result = Qundef;
04933 const ID mid = rb_frame_last_func();
04934 const VALUE sym = mid ? ID2SYM(mid) : ID2SYM(idNULL);
04935 struct exec_recursive_params p;
04936 int outermost;
04937 p.list = recursive_list_access(sym);
04938 p.objid = rb_obj_id(obj);
04939 p.obj = obj;
04940 p.pairid = pairid;
04941 p.arg = arg;
04942 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0);
04943
04944 if (recursive_check(p.list, p.objid, pairid)) {
04945 if (outer && !outermost) {
04946 rb_throw_obj(p.list, p.list);
04947 }
04948 return (*func)(obj, arg, TRUE);
04949 }
04950 else {
04951 int state;
04952
04953 p.func = func;
04954
04955 if (outermost) {
04956 recursive_push(p.list, ID2SYM(recursive_key), 0);
04957 recursive_push(p.list, p.objid, p.pairid);
04958 result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state);
04959 if (!recursive_pop(p.list, p.objid, p.pairid)) goto invalid;
04960 if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid;
04961 if (state) JUMP_TAG(state);
04962 if (result == p.list) {
04963 result = (*func)(obj, arg, TRUE);
04964 }
04965 }
04966 else {
04967 recursive_push(p.list, p.objid, p.pairid);
04968 PUSH_TAG();
04969 if ((state = EXEC_TAG()) == 0) {
04970 result = (*func)(obj, arg, FALSE);
04971 }
04972 POP_TAG();
04973 if (!recursive_pop(p.list, p.objid, p.pairid)) {
04974 invalid:
04975 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list "
04976 "for %+"PRIsVALUE" in %+"PRIsVALUE,
04977 sym, rb_thread_current());
04978 }
04979 if (state) JUMP_TAG(state);
04980 }
04981 }
04982 *(volatile struct exec_recursive_params *)&p;
04983 return result;
04984 }
04985
04986
04987
04988
04989
04990
04991 VALUE
04992 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
04993 {
04994 return exec_recursive(func, obj, 0, arg, 0);
04995 }
04996
04997
04998
04999
05000
05001
05002 VALUE
05003 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
05004 {
05005 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0);
05006 }
05007
05008
05009
05010
05011
05012
05013
05014 VALUE
05015 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
05016 {
05017 return exec_recursive(func, obj, 0, arg, 1);
05018 }
05019
05020
05021
05022
05023
05024
05025
05026 VALUE
05027 rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
05028 {
05029 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 1);
05030 }
05031
05032
05033
05034
05035
05036
05037
05038
05039
05040 static VALUE
05041 rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval)
05042 {
05043 return rb_vm_thread_backtrace(argc, argv, thval);
05044 }
05045
05046
05047
05048
05049
05050
05051
05052
05053
05054
05055
05056
05057 static VALUE
05058 rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
05059 {
05060 return rb_vm_thread_backtrace_locations(argc, argv, thval);
05061 }
05062
05063
05064
05065
05066
05067
05068
05069
05070
05071
05072
05073
05074
05075
05076
05077
05078 void
05079 Init_Thread(void)
05080 {
05081 #undef rb_intern
05082 #define rb_intern(str) rb_intern_const(str)
05083
05084 VALUE cThGroup;
05085 rb_thread_t *th = GET_THREAD();
05086
05087 sym_never = ID2SYM(rb_intern("never"));
05088 sym_immediate = ID2SYM(rb_intern("immediate"));
05089 sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
05090 id_locals = rb_intern("locals");
05091
05092 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
05093 rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
05094 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
05095 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0);
05096 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0);
05097 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0);
05098 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1);
05099 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0);
05100 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0);
05101 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
05102 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
05103 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
05104 #if THREAD_DEBUG < 0
05105 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
05106 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
05107 #endif
05108 rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1);
05109 rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1);
05110 rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1);
05111
05112 rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
05113 rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
05114 rb_define_method(rb_cThread, "join", thread_join_m, -1);
05115 rb_define_method(rb_cThread, "value", thread_value, 0);
05116 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0);
05117 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0);
05118 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0);
05119 rb_define_method(rb_cThread, "run", rb_thread_run, 0);
05120 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0);
05121 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1);
05122 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2);
05123 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1);
05124 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
05125 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
05126 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
05127 rb_define_method(rb_cThread, "status", rb_thread_status, 0);
05128 rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
05129 rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
05130 rb_define_method(rb_cThread, "thread_variables", rb_thread_variables, 0);
05131 rb_define_method(rb_cThread, "thread_variable?", rb_thread_variable_p, 1);
05132 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0);
05133 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0);
05134 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0);
05135 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1);
05136 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0);
05137 rb_define_method(rb_cThread, "group", rb_thread_group, 0);
05138 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
05139 rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
05140
05141 rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0);
05142
05143 closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed");
05144 OBJ_TAINT(closed_stream_error);
05145 OBJ_FREEZE(closed_stream_error);
05146
05147 cThGroup = rb_define_class("ThreadGroup", rb_cObject);
05148 rb_define_alloc_func(cThGroup, thgroup_s_alloc);
05149 rb_define_method(cThGroup, "list", thgroup_list, 0);
05150 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0);
05151 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0);
05152 rb_define_method(cThGroup, "add", thgroup_add, 1);
05153
05154 {
05155 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup);
05156 rb_define_const(cThGroup, "Default", th->thgroup);
05157 }
05158
05159 rb_cMutex = rb_define_class("Mutex", rb_cObject);
05160 rb_define_alloc_func(rb_cMutex, mutex_alloc);
05161 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
05162 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
05163 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
05164 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
05165 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
05166 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
05167 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
05168 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
05169
05170 recursive_key = rb_intern("__recursive_key__");
05171 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
05172
05173
05174 {
05175
05176 {
05177
05178 gvl_init(th->vm);
05179 gvl_acquire(th->vm, th);
05180 native_mutex_initialize(&th->vm->thread_destruct_lock);
05181 native_mutex_initialize(&th->interrupt_lock);
05182 native_cond_initialize(&th->interrupt_cond,
05183 RB_CONDATTR_CLOCK_MONOTONIC);
05184
05185 th->pending_interrupt_queue = rb_ary_tmp_new(0);
05186 th->pending_interrupt_queue_checked = 0;
05187 th->pending_interrupt_mask_stack = rb_ary_tmp_new(0);
05188
05189 th->interrupt_mask = 0;
05190 }
05191 }
05192
05193 rb_thread_create_timer_thread();
05194
05195
05196 (void)native_mutex_trylock;
05197 }
05198
05199 int
05200 ruby_native_thread_p(void)
05201 {
05202 rb_thread_t *th = ruby_thread_from_native();
05203
05204 return th != 0;
05205 }
05206
05207 static int
05208 check_deadlock_i(st_data_t key, st_data_t val, int *found)
05209 {
05210 VALUE thval = key;
05211 rb_thread_t *th;
05212 GetThreadPtr(thval, th);
05213
05214 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) {
05215 *found = 1;
05216 }
05217 else if (th->locking_mutex) {
05218 rb_mutex_t *mutex;
05219 GetMutexPtr(th->locking_mutex, mutex);
05220
05221 native_mutex_lock(&mutex->lock);
05222 if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) {
05223 *found = 1;
05224 }
05225 native_mutex_unlock(&mutex->lock);
05226 }
05227
05228 return (*found) ? ST_STOP : ST_CONTINUE;
05229 }
05230
05231 #ifdef DEBUG_DEADLOCK_CHECK
05232 static int
05233 debug_i(st_data_t key, st_data_t val, int *found)
05234 {
05235 VALUE thval = key;
05236 rb_thread_t *th;
05237 GetThreadPtr(thval, th);
05238
05239 printf("th:%p %d %d", th, th->status, th->interrupt_flag);
05240 if (th->locking_mutex) {
05241 rb_mutex_t *mutex;
05242 GetMutexPtr(th->locking_mutex, mutex);
05243
05244 native_mutex_lock(&mutex->lock);
05245 printf(" %p %d\n", mutex->th, mutex->cond_waiting);
05246 native_mutex_unlock(&mutex->lock);
05247 }
05248 else
05249 puts("");
05250
05251 return ST_CONTINUE;
05252 }
05253 #endif
05254
05255 static void
05256 rb_check_deadlock(rb_vm_t *vm)
05257 {
05258 int found = 0;
05259
05260 if (vm_living_thread_num(vm) > vm->sleeper) return;
05261 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
05262 if (patrol_thread && patrol_thread != GET_THREAD()) return;
05263
05264 st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found);
05265
05266 if (!found) {
05267 VALUE argv[2];
05268 argv[0] = rb_eFatal;
05269 argv[1] = rb_str_new2("No live threads left. Deadlock?");
05270 #ifdef DEBUG_DEADLOCK_CHECK
05271 printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread);
05272 st_foreach(vm->living_threads, debug_i, (st_data_t)0);
05273 #endif
05274 vm->sleeper--;
05275 rb_threadptr_raise(vm->main_thread, 2, argv);
05276 }
05277 }
05278
05279 static void
05280 update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass)
05281 {
05282 VALUE coverage = GET_THREAD()->cfp->iseq->coverage;
05283 if (coverage && RBASIC(coverage)->klass == 0) {
05284 long line = rb_sourceline() - 1;
05285 long count;
05286 if (RARRAY_AREF(coverage, line) == Qnil) {
05287 return;
05288 }
05289 count = FIX2LONG(RARRAY_AREF(coverage, line)) + 1;
05290 if (POSFIXABLE(count)) {
05291 RARRAY_ASET(coverage, line, LONG2FIX(count));
05292 }
05293 }
05294 }
05295
05296 VALUE
05297 rb_get_coverages(void)
05298 {
05299 return GET_VM()->coverages;
05300 }
05301
05302 void
05303 rb_set_coverages(VALUE coverages)
05304 {
05305 GET_VM()->coverages = coverages;
05306 rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil);
05307 }
05308
05309 void
05310 rb_reset_coverages(void)
05311 {
05312 GET_VM()->coverages = Qfalse;
05313 rb_remove_event_hook(update_coverage);
05314 }
05315
05316 VALUE
05317 rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
05318 {
05319 VALUE interrupt_mask = rb_hash_new();
05320 rb_thread_t *cur_th = GET_THREAD();
05321
05322 rb_hash_aset(interrupt_mask, rb_cObject, sym_never);
05323 rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
05324
05325 return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack);
05326 }
05327
05328 void
05329 ruby_kill(rb_pid_t pid, int sig)
05330 {
05331 int err;
05332 rb_thread_t *th = GET_THREAD();
05333
05334
05335
05336
05337
05338 {
05339 GVL_UNLOCK_BEGIN();
05340 native_mutex_lock(&th->interrupt_lock);
05341 err = kill(pid, sig);
05342 native_cond_wait(&th->interrupt_cond, &th->interrupt_lock);
05343 native_mutex_unlock(&th->interrupt_lock);
05344 GVL_UNLOCK_END();
05345 }
05346 if (err < 0) {
05347 rb_sys_fail(0);
05348 }
05349 }
05350