00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 #undef _FORTIFY_SOURCE
00055 #undef __USE_FORTIFY_LEVEL
00056 #define __USE_FORTIFY_LEVEL 0
00057
00058
00059
00060 #include "eval_intern.h"
00061 #include "gc.h"
00062 #include "internal.h"
00063 #include "ruby/io.h"
00064 #include "ruby/thread.h"
00065
00066 #ifndef USE_NATIVE_THREAD_PRIORITY
00067 #define USE_NATIVE_THREAD_PRIORITY 0
00068 #define RUBY_THREAD_PRIORITY_MAX 3
00069 #define RUBY_THREAD_PRIORITY_MIN -3
00070 #endif
00071
00072 #ifndef THREAD_DEBUG
00073 #define THREAD_DEBUG 0
00074 #endif
00075
00076 #define TIMET_MAX (~(time_t)0 <= 0 ? (time_t)((~(unsigned_time_t)0) >> 1) : (time_t)(~(unsigned_time_t)0))
00077 #define TIMET_MIN (~(time_t)0 <= 0 ? (time_t)(((unsigned_time_t)1) << (sizeof(time_t) * CHAR_BIT - 1)) : (time_t)0)
00078
00079 VALUE rb_cMutex;
00080 VALUE rb_cThreadShield;
00081
00082 static VALUE sym_immediate;
00083 static VALUE sym_on_blocking;
00084 static VALUE sym_never;
00085
00086 static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check);
00087 static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check);
00088 static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check);
00089 static double timeofday(void);
00090 static int rb_threadptr_dead(rb_thread_t *th);
00091 static void rb_check_deadlock(rb_vm_t *vm);
00092 static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th);
00093
00094 #define eKillSignal INT2FIX(0)
00095 #define eTerminateSignal INT2FIX(1)
00096 static volatile int system_working = 1;
00097
00098 #define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream]
00099
00100 inline static void
00101 st_delete_wrap(st_table *table, st_data_t key)
00102 {
00103 st_delete(table, &key, 0);
00104 }
00105
00106
00107
00108 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
00109
00110 struct rb_blocking_region_buffer {
00111 enum rb_thread_status prev_status;
00112 struct rb_unblock_callback oldubf;
00113 };
00114
00115 static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
00116 struct rb_unblock_callback *old, int fail_if_interrupted);
00117 static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old);
00118
00119 static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
00120 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
00121 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
00122
00123 #ifdef __ia64
00124 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) \
00125 do{(th)->machine_register_stack_end = rb_ia64_bsp();}while(0)
00126 #else
00127 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th)
00128 #endif
00129 #define RB_GC_SAVE_MACHINE_CONTEXT(th) \
00130 do { \
00131 FLUSH_REGISTER_WINDOWS; \
00132 RB_GC_SAVE_MACHINE_REGISTER_STACK(th); \
00133 setjmp((th)->machine_regs); \
00134 SET_MACHINE_STACK_END(&(th)->machine_stack_end); \
00135 } while (0)
00136
00137 #define GVL_UNLOCK_BEGIN() do { \
00138 rb_thread_t *_th_stored = GET_THREAD(); \
00139 RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \
00140 gvl_release(_th_stored->vm);
00141
00142 #define GVL_UNLOCK_END() \
00143 gvl_acquire(_th_stored->vm, _th_stored); \
00144 rb_thread_set_current(_th_stored); \
00145 } while(0)
00146
00147 #ifdef __GNUC__
00148 #define only_if_constant(expr, notconst) (__builtin_constant_p(expr) ? (expr) : (notconst))
00149 #else
00150 #define only_if_constant(expr, notconst) notconst
00151 #endif
00152 #define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \
00153 rb_thread_t *__th = GET_THREAD(); \
00154 struct rb_blocking_region_buffer __region; \
00155 if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
00156 \
00157 !only_if_constant(fail_if_interrupted, TRUE)) { \
00158 exec; \
00159 blocking_region_end(__th, &__region); \
00160 }; \
00161 } while(0)
00162
00163 #if THREAD_DEBUG
00164 #ifdef HAVE_VA_ARGS_MACRO
00165 void rb_thread_debug(const char *file, int line, const char *fmt, ...);
00166 #define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__)
00167 #define POSITION_FORMAT "%s:%d:"
00168 #define POSITION_ARGS ,file, line
00169 #else
00170 void rb_thread_debug(const char *fmt, ...);
00171 #define thread_debug rb_thread_debug
00172 #define POSITION_FORMAT
00173 #define POSITION_ARGS
00174 #endif
00175
00176 # if THREAD_DEBUG < 0
00177 static int rb_thread_debug_enabled;
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187 static VALUE
00188 rb_thread_s_debug(void)
00189 {
00190 return INT2NUM(rb_thread_debug_enabled);
00191 }
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201 static VALUE
00202 rb_thread_s_debug_set(VALUE self, VALUE val)
00203 {
00204 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0;
00205 return val;
00206 }
00207 # else
00208 # define rb_thread_debug_enabled THREAD_DEBUG
00209 # endif
00210 #else
00211 #define thread_debug if(0)printf
00212 #endif
00213
00214 #ifndef __ia64
00215 #define thread_start_func_2(th, st, rst) thread_start_func_2(th, st)
00216 #endif
00217 NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
00218 VALUE *register_stack_start));
00219 static void timer_thread_function(void *);
00220
00221 #if defined(_WIN32)
00222 #include "thread_win32.c"
00223
00224 #define DEBUG_OUT() \
00225 WaitForSingleObject(&debug_mutex, INFINITE); \
00226 printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \
00227 fflush(stdout); \
00228 ReleaseMutex(&debug_mutex);
00229
00230 #elif defined(HAVE_PTHREAD_H)
00231 #include "thread_pthread.c"
00232
00233 #define DEBUG_OUT() \
00234 pthread_mutex_lock(&debug_mutex); \
00235 printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \
00236 fflush(stdout); \
00237 pthread_mutex_unlock(&debug_mutex);
00238
00239 #else
00240 #error "unsupported thread type"
00241 #endif
00242
00243 #if THREAD_DEBUG
00244 static int debug_mutex_initialized = 1;
00245 static rb_thread_lock_t debug_mutex;
00246
00247 void
00248 rb_thread_debug(
00249 #ifdef HAVE_VA_ARGS_MACRO
00250 const char *file, int line,
00251 #endif
00252 const char *fmt, ...)
00253 {
00254 va_list args;
00255 char buf[BUFSIZ];
00256
00257 if (!rb_thread_debug_enabled) return;
00258
00259 if (debug_mutex_initialized == 1) {
00260 debug_mutex_initialized = 0;
00261 native_mutex_initialize(&debug_mutex);
00262 }
00263
00264 va_start(args, fmt);
00265 vsnprintf(buf, BUFSIZ, fmt, args);
00266 va_end(args);
00267
00268 DEBUG_OUT();
00269 }
00270 #endif
00271
00272 void
00273 rb_vm_gvl_destroy(rb_vm_t *vm)
00274 {
00275 gvl_release(vm);
00276 gvl_destroy(vm);
00277 native_mutex_destroy(&vm->thread_destruct_lock);
00278 }
00279
00280 void
00281 rb_thread_lock_unlock(rb_thread_lock_t *lock)
00282 {
00283 native_mutex_unlock(lock);
00284 }
00285
00286 void
00287 rb_thread_lock_destroy(rb_thread_lock_t *lock)
00288 {
00289 native_mutex_destroy(lock);
00290 }
00291
00292 static int
00293 set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
00294 struct rb_unblock_callback *old, int fail_if_interrupted)
00295 {
00296 check_ints:
00297 if (fail_if_interrupted) {
00298 if (RUBY_VM_INTERRUPTED_ANY(th)) {
00299 return FALSE;
00300 }
00301 }
00302 else {
00303 RUBY_VM_CHECK_INTS(th);
00304 }
00305
00306 native_mutex_lock(&th->interrupt_lock);
00307 if (RUBY_VM_INTERRUPTED_ANY(th)) {
00308 native_mutex_unlock(&th->interrupt_lock);
00309 goto check_ints;
00310 }
00311 else {
00312 if (old) *old = th->unblock;
00313 th->unblock.func = func;
00314 th->unblock.arg = arg;
00315 }
00316 native_mutex_unlock(&th->interrupt_lock);
00317
00318 return TRUE;
00319 }
00320
00321 static void
00322 reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old)
00323 {
00324 native_mutex_lock(&th->interrupt_lock);
00325 th->unblock = *old;
00326 native_mutex_unlock(&th->interrupt_lock);
00327 }
00328
00329 static void
00330 rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
00331 {
00332 native_mutex_lock(&th->interrupt_lock);
00333 if (trap)
00334 RUBY_VM_SET_TRAP_INTERRUPT(th);
00335 else
00336 RUBY_VM_SET_INTERRUPT(th);
00337 if (th->unblock.func) {
00338 (th->unblock.func)(th->unblock.arg);
00339 }
00340 else {
00341
00342 }
00343 native_mutex_unlock(&th->interrupt_lock);
00344 }
00345
00346 void
00347 rb_threadptr_interrupt(rb_thread_t *th)
00348 {
00349 rb_threadptr_interrupt_common(th, 0);
00350 }
00351
00352 void
00353 rb_threadptr_trap_interrupt(rb_thread_t *th)
00354 {
00355 rb_threadptr_interrupt_common(th, 1);
00356 }
00357
00358 static int
00359 terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread)
00360 {
00361 VALUE thval = key;
00362 rb_thread_t *th;
00363 GetThreadPtr(thval, th);
00364
00365 if (th != main_thread) {
00366 thread_debug("terminate_i: %p\n", (void *)th);
00367 rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
00368 rb_threadptr_interrupt(th);
00369 }
00370 else {
00371 thread_debug("terminate_i: main thread (%p)\n", (void *)th);
00372 }
00373 return ST_CONTINUE;
00374 }
00375
00376 typedef struct rb_mutex_struct
00377 {
00378 rb_thread_lock_t lock;
00379 rb_thread_cond_t cond;
00380 struct rb_thread_struct volatile *th;
00381 int cond_waiting;
00382 struct rb_mutex_struct *next_mutex;
00383 int allow_trap;
00384 } rb_mutex_t;
00385
00386 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
00387 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
00388 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
00389 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th);
00390
00391 void
00392 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
00393 {
00394 const char *err;
00395 rb_mutex_t *mutex;
00396 rb_mutex_t *mutexes = th->keeping_mutexes;
00397
00398 while (mutexes) {
00399 mutex = mutexes;
00400
00401
00402 mutexes = mutex->next_mutex;
00403 err = rb_mutex_unlock_th(mutex, th);
00404 if (err) rb_bug("invalid keeping_mutexes: %s", err);
00405 }
00406 }
00407
00408 void
00409 rb_thread_terminate_all(void)
00410 {
00411 rb_thread_t *th = GET_THREAD();
00412 rb_vm_t *vm = th->vm;
00413
00414 if (vm->main_thread != th) {
00415 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
00416 (void *)vm->main_thread, (void *)th);
00417 }
00418
00419
00420 rb_threadptr_unlock_all_locking_mutexes(th);
00421
00422 retry:
00423 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
00424 st_foreach(vm->living_threads, terminate_i, (st_data_t)th);
00425
00426 while (!rb_thread_alone()) {
00427 int state;
00428
00429 TH_PUSH_TAG(th);
00430 if ((state = TH_EXEC_TAG()) == 0) {
00431 native_sleep(th, 0);
00432 RUBY_VM_CHECK_INTS_BLOCKING(th);
00433 }
00434 TH_POP_TAG();
00435
00436 if (state) {
00437 goto retry;
00438 }
00439 }
00440 }
00441
00442 static void
00443 thread_cleanup_func_before_exec(void *th_ptr)
00444 {
00445 rb_thread_t *th = th_ptr;
00446 th->status = THREAD_KILLED;
00447 th->machine_stack_start = th->machine_stack_end = 0;
00448 #ifdef __ia64
00449 th->machine_register_stack_start = th->machine_register_stack_end = 0;
00450 #endif
00451 }
00452
00453 static void
00454 thread_cleanup_func(void *th_ptr, int atfork)
00455 {
00456 rb_thread_t *th = th_ptr;
00457
00458 th->locking_mutex = Qfalse;
00459 thread_cleanup_func_before_exec(th_ptr);
00460
00461
00462
00463
00464
00465
00466 if (atfork)
00467 return;
00468
00469 native_mutex_destroy(&th->interrupt_lock);
00470 native_thread_destroy(th);
00471 }
00472
00473 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
00474
00475 void
00476 ruby_thread_init_stack(rb_thread_t *th)
00477 {
00478 native_thread_init_stack(th);
00479 }
00480
00481 static int
00482 thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)
00483 {
00484 int state;
00485 VALUE args = th->first_args;
00486 rb_proc_t *proc;
00487 rb_thread_list_t *join_list;
00488 rb_thread_t *main_th;
00489 VALUE errinfo = Qnil;
00490 # ifdef USE_SIGALTSTACK
00491 void rb_register_sigaltstack(rb_thread_t *th);
00492
00493 rb_register_sigaltstack(th);
00494 # endif
00495
00496 if (th == th->vm->main_thread)
00497 rb_bug("thread_start_func_2 must not used for main thread");
00498
00499 ruby_thread_set_native(th);
00500
00501 th->machine_stack_start = stack_start;
00502 #ifdef __ia64
00503 th->machine_register_stack_start = register_stack_start;
00504 #endif
00505 thread_debug("thread start: %p\n", (void *)th);
00506
00507 gvl_acquire(th->vm, th);
00508 {
00509 thread_debug("thread start (get lock): %p\n", (void *)th);
00510 rb_thread_set_current(th);
00511
00512 TH_PUSH_TAG(th);
00513 if ((state = EXEC_TAG()) == 0) {
00514 SAVE_ROOT_JMPBUF(th, {
00515 if (!th->first_func) {
00516 GetProcPtr(th->first_proc, proc);
00517 th->errinfo = Qnil;
00518 th->root_lep = rb_vm_ep_local_ep(proc->block.ep);
00519 th->root_svar = Qnil;
00520 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, Qundef);
00521 th->value = rb_vm_invoke_proc(th, proc, (int)RARRAY_LEN(args), RARRAY_PTR(args), 0);
00522 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_END, th->self, 0, 0, Qundef);
00523 }
00524 else {
00525 th->value = (*th->first_func)((void *)args);
00526 }
00527 });
00528 }
00529 else {
00530 errinfo = th->errinfo;
00531 if (state == TAG_FATAL) {
00532
00533 }
00534 else if (th->safe_level >= 4) {
00535
00536 errinfo = Qnil;
00537 }
00538 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
00539
00540 }
00541 else if (th->vm->thread_abort_on_exception ||
00542 th->abort_on_exception || RTEST(ruby_debug)) {
00543
00544 }
00545 else {
00546 errinfo = Qnil;
00547 }
00548 th->value = Qnil;
00549 }
00550
00551 th->status = THREAD_KILLED;
00552 thread_debug("thread end: %p\n", (void *)th);
00553
00554 main_th = th->vm->main_thread;
00555 if (main_th == th) {
00556 ruby_stop(0);
00557 }
00558 if (RB_TYPE_P(errinfo, T_OBJECT)) {
00559
00560 rb_threadptr_raise(main_th, 1, &errinfo);
00561 }
00562 TH_POP_TAG();
00563
00564
00565 if (th->locking_mutex != Qfalse) {
00566 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
00567 (void *)th, th->locking_mutex);
00568 }
00569
00570
00571 st_delete_wrap(th->vm->living_threads, th->self);
00572 if (rb_thread_alone()) {
00573
00574 rb_threadptr_interrupt(main_th);
00575 }
00576
00577
00578 join_list = th->join_list;
00579 while (join_list) {
00580 rb_threadptr_interrupt(join_list->th);
00581 switch (join_list->th->status) {
00582 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
00583 join_list->th->status = THREAD_RUNNABLE;
00584 default: break;
00585 }
00586 join_list = join_list->next;
00587 }
00588
00589 rb_threadptr_unlock_all_locking_mutexes(th);
00590 rb_check_deadlock(th->vm);
00591
00592 if (!th->root_fiber) {
00593 rb_thread_recycle_stack_release(th->stack);
00594 th->stack = 0;
00595 }
00596 }
00597 native_mutex_lock(&th->vm->thread_destruct_lock);
00598
00599 th->vm->running_thread = NULL;
00600 native_mutex_unlock(&th->vm->thread_destruct_lock);
00601 thread_cleanup_func(th, FALSE);
00602 gvl_release(th->vm);
00603
00604 return 0;
00605 }
00606
00607 static VALUE
00608 thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
00609 {
00610 rb_thread_t *th, *current_th = GET_THREAD();
00611 int err;
00612
00613 if (OBJ_FROZEN(GET_THREAD()->thgroup)) {
00614 rb_raise(rb_eThreadError,
00615 "can't start a new thread (frozen ThreadGroup)");
00616 }
00617 GetThreadPtr(thval, th);
00618
00619
00620 th->first_func = fn;
00621 th->first_proc = fn ? Qfalse : rb_block_proc();
00622 th->first_args = args;
00623
00624 th->priority = current_th->priority;
00625 th->thgroup = current_th->thgroup;
00626
00627 th->pending_interrupt_queue = rb_ary_tmp_new(0);
00628 th->pending_interrupt_queue_checked = 0;
00629 th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
00630 RBASIC(th->pending_interrupt_mask_stack)->klass = 0;
00631
00632 th->interrupt_mask = 0;
00633
00634 native_mutex_initialize(&th->interrupt_lock);
00635
00636
00637 err = native_thread_create(th);
00638 if (err) {
00639 th->status = THREAD_KILLED;
00640 rb_raise(rb_eThreadError, "can't create Thread (%d)", err);
00641 }
00642 st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id);
00643 return thval;
00644 }
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666 static VALUE
00667 thread_s_new(int argc, VALUE *argv, VALUE klass)
00668 {
00669 rb_thread_t *th;
00670 VALUE thread = rb_thread_alloc(klass);
00671
00672 if (GET_VM()->main_thread->status == THREAD_KILLED)
00673 rb_raise(rb_eThreadError, "can't alloc thread");
00674
00675 rb_obj_call_init(thread, argc, argv);
00676 GetThreadPtr(thread, th);
00677 if (!th->first_args) {
00678 rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'",
00679 rb_class2name(klass));
00680 }
00681 return thread;
00682 }
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694 static VALUE
00695 thread_start(VALUE klass, VALUE args)
00696 {
00697 return thread_create_core(rb_thread_alloc(klass), args, 0);
00698 }
00699
00700
00701 static VALUE
00702 thread_initialize(VALUE thread, VALUE args)
00703 {
00704 rb_thread_t *th;
00705 if (!rb_block_given_p()) {
00706 rb_raise(rb_eThreadError, "must be called with a block");
00707 }
00708 GetThreadPtr(thread, th);
00709 if (th->first_args) {
00710 VALUE proc = th->first_proc, line, loc;
00711 const char *file;
00712 if (!proc || !RTEST(loc = rb_proc_location(proc))) {
00713 rb_raise(rb_eThreadError, "already initialized thread");
00714 }
00715 file = RSTRING_PTR(RARRAY_PTR(loc)[0]);
00716 if (NIL_P(line = RARRAY_PTR(loc)[1])) {
00717 rb_raise(rb_eThreadError, "already initialized thread - %s",
00718 file);
00719 }
00720 rb_raise(rb_eThreadError, "already initialized thread - %s:%d",
00721 file, NUM2INT(line));
00722 }
00723 return thread_create_core(thread, args, 0);
00724 }
00725
00726 VALUE
00727 rb_thread_create(VALUE (*fn)(ANYARGS), void *arg)
00728 {
00729 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn);
00730 }
00731
00732
00733
00734 #define DELAY_INFTY 1E30
00735
00736 struct join_arg {
00737 rb_thread_t *target, *waiting;
00738 double limit;
00739 int forever;
00740 };
00741
00742 static VALUE
00743 remove_from_join_list(VALUE arg)
00744 {
00745 struct join_arg *p = (struct join_arg *)arg;
00746 rb_thread_t *target_th = p->target, *th = p->waiting;
00747
00748 if (target_th->status != THREAD_KILLED) {
00749 rb_thread_list_t **p = &target_th->join_list;
00750
00751 while (*p) {
00752 if ((*p)->th == th) {
00753 *p = (*p)->next;
00754 break;
00755 }
00756 p = &(*p)->next;
00757 }
00758 }
00759
00760 return Qnil;
00761 }
00762
00763 static VALUE
00764 thread_join_sleep(VALUE arg)
00765 {
00766 struct join_arg *p = (struct join_arg *)arg;
00767 rb_thread_t *target_th = p->target, *th = p->waiting;
00768 double now, limit = p->limit;
00769
00770 while (target_th->status != THREAD_KILLED) {
00771 if (p->forever) {
00772 sleep_forever(th, 1, 0);
00773 }
00774 else {
00775 now = timeofday();
00776 if (now > limit) {
00777 thread_debug("thread_join: timeout (thid: %p)\n",
00778 (void *)target_th->thread_id);
00779 return Qfalse;
00780 }
00781 sleep_wait_for_interrupt(th, limit - now, 0);
00782 }
00783 thread_debug("thread_join: interrupted (thid: %p)\n",
00784 (void *)target_th->thread_id);
00785 }
00786 return Qtrue;
00787 }
00788
00789 static VALUE
00790 thread_join(rb_thread_t *target_th, double delay)
00791 {
00792 rb_thread_t *th = GET_THREAD();
00793 struct join_arg arg;
00794
00795 if (th == target_th) {
00796 rb_raise(rb_eThreadError, "Target thread must not be current thread");
00797 }
00798 if (GET_VM()->main_thread == target_th) {
00799 rb_raise(rb_eThreadError, "Target thread must not be main thread");
00800 }
00801
00802 arg.target = target_th;
00803 arg.waiting = th;
00804 arg.limit = timeofday() + delay;
00805 arg.forever = delay == DELAY_INFTY;
00806
00807 thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id);
00808
00809 if (target_th->status != THREAD_KILLED) {
00810 rb_thread_list_t list;
00811 list.next = target_th->join_list;
00812 list.th = th;
00813 target_th->join_list = &list;
00814 if (!rb_ensure(thread_join_sleep, (VALUE)&arg,
00815 remove_from_join_list, (VALUE)&arg)) {
00816 return Qnil;
00817 }
00818 }
00819
00820 thread_debug("thread_join: success (thid: %p)\n",
00821 (void *)target_th->thread_id);
00822
00823 if (target_th->errinfo != Qnil) {
00824 VALUE err = target_th->errinfo;
00825
00826 if (FIXNUM_P(err)) {
00827
00828 }
00829 else if (RB_TYPE_P(target_th->errinfo, T_NODE)) {
00830 rb_exc_raise(rb_vm_make_jump_tag_but_local_jump(
00831 GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err)));
00832 }
00833 else {
00834
00835 rb_exc_raise(err);
00836 }
00837 }
00838 return target_th->self;
00839 }
00840
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881 static VALUE
00882 thread_join_m(int argc, VALUE *argv, VALUE self)
00883 {
00884 rb_thread_t *target_th;
00885 double delay = DELAY_INFTY;
00886 VALUE limit;
00887
00888 GetThreadPtr(self, target_th);
00889
00890 rb_scan_args(argc, argv, "01", &limit);
00891 if (!NIL_P(limit)) {
00892 delay = rb_num2dbl(limit);
00893 }
00894
00895 return thread_join(target_th, delay);
00896 }
00897
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912 static VALUE
00913 thread_value(VALUE self)
00914 {
00915 rb_thread_t *th;
00916 GetThreadPtr(self, th);
00917 thread_join(th, DELAY_INFTY);
00918 return th->value;
00919 }
00920
00921
00922
00923
00924
00925 static struct timeval
00926 double2timeval(double d)
00927 {
00928 struct timeval time;
00929
00930 if (isinf(d)) {
00931 time.tv_sec = TIMET_MAX;
00932 time.tv_usec = 0;
00933 return time;
00934 }
00935
00936 time.tv_sec = (int)d;
00937 time.tv_usec = (int)((d - (int)d) * 1e6);
00938 if (time.tv_usec < 0) {
00939 time.tv_usec += (int)1e6;
00940 time.tv_sec -= 1;
00941 }
00942 return time;
00943 }
00944
00945 static void
00946 sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check)
00947 {
00948 enum rb_thread_status prev_status = th->status;
00949 enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
00950
00951 th->status = status;
00952 RUBY_VM_CHECK_INTS_BLOCKING(th);
00953 while (th->status == status) {
00954 if (deadlockable) {
00955 th->vm->sleeper++;
00956 rb_check_deadlock(th->vm);
00957 }
00958 native_sleep(th, 0);
00959 if (deadlockable) {
00960 th->vm->sleeper--;
00961 }
00962 RUBY_VM_CHECK_INTS_BLOCKING(th);
00963 if (!spurious_check)
00964 break;
00965 }
00966 th->status = prev_status;
00967 }
00968
00969 static void
00970 getclockofday(struct timeval *tp)
00971 {
00972 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
00973 struct timespec ts;
00974
00975 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
00976 tp->tv_sec = ts.tv_sec;
00977 tp->tv_usec = ts.tv_nsec / 1000;
00978 } else
00979 #endif
00980 {
00981 gettimeofday(tp, NULL);
00982 }
00983 }
00984
00985 static void
00986 sleep_timeval(rb_thread_t *th, struct timeval tv, int spurious_check)
00987 {
00988 struct timeval to, tvn;
00989 enum rb_thread_status prev_status = th->status;
00990
00991 getclockofday(&to);
00992 if (TIMET_MAX - tv.tv_sec < to.tv_sec)
00993 to.tv_sec = TIMET_MAX;
00994 else
00995 to.tv_sec += tv.tv_sec;
00996 if ((to.tv_usec += tv.tv_usec) >= 1000000) {
00997 if (to.tv_sec == TIMET_MAX)
00998 to.tv_usec = 999999;
00999 else {
01000 to.tv_sec++;
01001 to.tv_usec -= 1000000;
01002 }
01003 }
01004
01005 th->status = THREAD_STOPPED;
01006 RUBY_VM_CHECK_INTS_BLOCKING(th);
01007 while (th->status == THREAD_STOPPED) {
01008 native_sleep(th, &tv);
01009 RUBY_VM_CHECK_INTS_BLOCKING(th);
01010 getclockofday(&tvn);
01011 if (to.tv_sec < tvn.tv_sec) break;
01012 if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
01013 thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n",
01014 (long)to.tv_sec, (long)to.tv_usec,
01015 (long)tvn.tv_sec, (long)tvn.tv_usec);
01016 tv.tv_sec = to.tv_sec - tvn.tv_sec;
01017 if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) {
01018 --tv.tv_sec;
01019 tv.tv_usec += 1000000;
01020 }
01021 if (!spurious_check)
01022 break;
01023 }
01024 th->status = prev_status;
01025 }
01026
01027 void
01028 rb_thread_sleep_forever(void)
01029 {
01030 thread_debug("rb_thread_sleep_forever\n");
01031 sleep_forever(GET_THREAD(), 0, 1);
01032 }
01033
01034 static void
01035 rb_thread_sleep_deadly(void)
01036 {
01037 thread_debug("rb_thread_sleep_deadly\n");
01038 sleep_forever(GET_THREAD(), 1, 1);
01039 }
01040
01041 static double
01042 timeofday(void)
01043 {
01044 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
01045 struct timespec tp;
01046
01047 if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) {
01048 return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9;
01049 } else
01050 #endif
01051 {
01052 struct timeval tv;
01053 gettimeofday(&tv, NULL);
01054 return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
01055 }
01056 }
01057
01058 static void
01059 sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check)
01060 {
01061 sleep_timeval(th, double2timeval(sleepsec), spurious_check);
01062 }
01063
01064 static void
01065 sleep_for_polling(rb_thread_t *th)
01066 {
01067 struct timeval time;
01068 time.tv_sec = 0;
01069 time.tv_usec = 100 * 1000;
01070 sleep_timeval(th, time, 1);
01071 }
01072
01073 void
01074 rb_thread_wait_for(struct timeval time)
01075 {
01076 rb_thread_t *th = GET_THREAD();
01077 sleep_timeval(th, time, 1);
01078 }
01079
01080 void
01081 rb_thread_polling(void)
01082 {
01083 if (!rb_thread_alone()) {
01084 rb_thread_t *th = GET_THREAD();
01085 RUBY_VM_CHECK_INTS_BLOCKING(th);
01086 sleep_for_polling(th);
01087 }
01088 }
01089
01090
01091
01092
01093
01094
01095
01096
01097 void
01098 rb_thread_check_ints(void)
01099 {
01100 RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD());
01101 }
01102
01103
01104
01105
01106
01107 int
01108 rb_thread_check_trap_pending(void)
01109 {
01110 return rb_signal_buff_size() != 0;
01111 }
01112
01113
01114 int
01115 rb_thread_interrupted(VALUE thval)
01116 {
01117 rb_thread_t *th;
01118 GetThreadPtr(thval, th);
01119 return (int)RUBY_VM_INTERRUPTED(th);
01120 }
01121
01122 void
01123 rb_thread_sleep(int sec)
01124 {
01125 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
01126 }
01127
01128 static void
01129 rb_thread_schedule_limits(unsigned long limits_us)
01130 {
01131 thread_debug("rb_thread_schedule\n");
01132 if (!rb_thread_alone()) {
01133 rb_thread_t *th = GET_THREAD();
01134
01135 if (th->running_time_us >= limits_us) {
01136 thread_debug("rb_thread_schedule/switch start\n");
01137 RB_GC_SAVE_MACHINE_CONTEXT(th);
01138 gvl_yield(th->vm, th);
01139 rb_thread_set_current(th);
01140 thread_debug("rb_thread_schedule/switch done\n");
01141 }
01142 }
01143 }
01144
01145 void
01146 rb_thread_schedule(void)
01147 {
01148 rb_thread_t *cur_th = GET_THREAD();
01149 rb_thread_schedule_limits(0);
01150
01151 if (UNLIKELY(RUBY_VM_INTERRUPTED_ANY(cur_th))) {
01152 rb_threadptr_execute_interrupts(cur_th, 0);
01153 }
01154 }
01155
01156
01157
01158 static inline int
01159 blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
01160 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
01161 {
01162 region->prev_status = th->status;
01163 if (set_unblock_function(th, ubf, arg, ®ion->oldubf, fail_if_interrupted)) {
01164 th->blocking_region_buffer = region;
01165 th->status = THREAD_STOPPED;
01166 thread_debug("enter blocking region (%p)\n", (void *)th);
01167 RB_GC_SAVE_MACHINE_CONTEXT(th);
01168 gvl_release(th->vm);
01169 return TRUE;
01170 }
01171 else {
01172 return FALSE;
01173 }
01174 }
01175
01176 static inline void
01177 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
01178 {
01179 gvl_acquire(th->vm, th);
01180 rb_thread_set_current(th);
01181 thread_debug("leave blocking region (%p)\n", (void *)th);
01182 remove_signal_thread_list(th);
01183 th->blocking_region_buffer = 0;
01184 reset_unblock_function(th, ®ion->oldubf);
01185 if (th->status == THREAD_STOPPED) {
01186 th->status = region->prev_status;
01187 }
01188 }
01189
01190 struct rb_blocking_region_buffer *
01191 rb_thread_blocking_region_begin(void)
01192 {
01193 rb_thread_t *th = GET_THREAD();
01194 struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer);
01195 blocking_region_begin(th, region, ubf_select, th, FALSE);
01196 return region;
01197 }
01198
01199 void
01200 rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
01201 {
01202 int saved_errno = errno;
01203 rb_thread_t *th = ruby_thread_from_native();
01204 blocking_region_end(th, region);
01205 xfree(region);
01206 RUBY_VM_CHECK_INTS_BLOCKING(th);
01207 errno = saved_errno;
01208 }
01209
01210 static void *
01211 call_without_gvl(void *(*func)(void *), void *data1,
01212 rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted)
01213 {
01214 void *val = 0;
01215
01216 rb_thread_t *th = GET_THREAD();
01217 int saved_errno = 0;
01218
01219 th->waiting_fd = -1;
01220 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
01221 ubf = ubf_select;
01222 data2 = th;
01223 }
01224
01225 BLOCKING_REGION({
01226 val = func(data1);
01227 saved_errno = errno;
01228 }, ubf, data2, fail_if_interrupted);
01229
01230 if (!fail_if_interrupted) {
01231 RUBY_VM_CHECK_INTS_BLOCKING(th);
01232 }
01233
01234 errno = saved_errno;
01235
01236 return val;
01237 }
01238
01239
01240
01241
01242
01243
01244
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256
01257
01258
01259
01260
01261
01262
01263
01264
01265
01266
01267
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277
01278
01279
01280
01281
01282
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311
01312
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322
01323
01324 void *
01325 rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
01326 rb_unblock_function_t *ubf, void *data2)
01327 {
01328 return call_without_gvl(func, data1, ubf, data2, TRUE);
01329 }
01330
01331 void *
01332 rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
01333 rb_unblock_function_t *ubf, void *data2)
01334 {
01335 return call_without_gvl(func, data1, ubf, data2, FALSE);
01336 }
01337
01338 VALUE
01339 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
01340 {
01341 VALUE val = Qundef;
01342 rb_thread_t *th = GET_THREAD();
01343 int saved_errno = 0;
01344 int state;
01345
01346 th->waiting_fd = fd;
01347
01348 TH_PUSH_TAG(th);
01349 if ((state = EXEC_TAG()) == 0) {
01350 BLOCKING_REGION({
01351 val = func(data1);
01352 saved_errno = errno;
01353 }, ubf_select, th, FALSE);
01354 }
01355 TH_POP_TAG();
01356
01357
01358 th->waiting_fd = -1;
01359
01360 if (state) {
01361 JUMP_TAG(state);
01362 }
01363
01364 RUBY_VM_CHECK_INTS_BLOCKING(th);
01365
01366 errno = saved_errno;
01367
01368 return val;
01369 }
01370
01371 VALUE
01372 rb_thread_blocking_region(
01373 rb_blocking_function_t *func, void *data1,
01374 rb_unblock_function_t *ubf, void *data2)
01375 {
01376 void *(*f)(void*) = (void *(*)(void*))func;
01377 return (VALUE)rb_thread_call_without_gvl(f, data1, ubf, data2);
01378 }
01379
01380
01381
01382
01383
01384
01385
01386
01387
01388
01389
01390
01391
01392
01393
01394
01395
01396
01397
01398
01399
01400
01401
01402
01403
01404
01405
01406
01407
01408 void *
01409 rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
01410 {
01411 rb_thread_t *th = ruby_thread_from_native();
01412 struct rb_blocking_region_buffer *brb;
01413 struct rb_unblock_callback prev_unblock;
01414 void *r;
01415
01416 if (th == 0) {
01417
01418
01419
01420
01421
01422 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
01423 exit(EXIT_FAILURE);
01424 }
01425
01426 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer;
01427 prev_unblock = th->unblock;
01428
01429 if (brb == 0) {
01430 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
01431 }
01432
01433 blocking_region_end(th, brb);
01434
01435 r = (*func)(data1);
01436
01437 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
01438 return r;
01439 }
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450 int
01451 ruby_thread_has_gvl_p(void)
01452 {
01453 rb_thread_t *th = ruby_thread_from_native();
01454
01455 if (th && th->blocking_region_buffer == 0) {
01456 return 1;
01457 }
01458 else {
01459 return 0;
01460 }
01461 }
01462
01463
01464
01465
01466
01467
01468
01469
01470
01471 static VALUE
01472 thread_s_pass(VALUE klass)
01473 {
01474 rb_thread_schedule();
01475 return Qnil;
01476 }
01477
01478
01479
01480
01481
01482
01483
01484
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495
01496 void
01497 rb_threadptr_pending_interrupt_clear(rb_thread_t *th)
01498 {
01499 rb_ary_clear(th->pending_interrupt_queue);
01500 }
01501
01502 void
01503 rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v)
01504 {
01505 rb_ary_push(th->pending_interrupt_queue, v);
01506 th->pending_interrupt_queue_checked = 0;
01507 }
01508
01509 enum handle_interrupt_timing {
01510 INTERRUPT_NONE,
01511 INTERRUPT_IMMEDIATE,
01512 INTERRUPT_ON_BLOCKING,
01513 INTERRUPT_NEVER
01514 };
01515
01516 static enum handle_interrupt_timing
01517 rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
01518 {
01519 VALUE mask;
01520 long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack);
01521 VALUE *mask_stack = RARRAY_PTR(th->pending_interrupt_mask_stack);
01522 VALUE ancestors = rb_mod_ancestors(err);
01523 long ancestors_len = RARRAY_LEN(ancestors);
01524 VALUE *ancestors_ptr = RARRAY_PTR(ancestors);
01525 int i, j;
01526
01527 for (i=0; i<mask_stack_len; i++) {
01528 mask = mask_stack[mask_stack_len-(i+1)];
01529
01530 for (j=0; j<ancestors_len; j++) {
01531 VALUE klass = ancestors_ptr[j];
01532 VALUE sym;
01533
01534
01535 if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
01536 if (sym == sym_immediate) {
01537 return INTERRUPT_IMMEDIATE;
01538 }
01539 else if (sym == sym_on_blocking) {
01540 return INTERRUPT_ON_BLOCKING;
01541 }
01542 else if (sym == sym_never) {
01543 return INTERRUPT_NEVER;
01544 }
01545 else {
01546 rb_raise(rb_eThreadError, "unknown mask signature");
01547 }
01548 }
01549 }
01550
01551 }
01552 return INTERRUPT_NONE;
01553 }
01554
01555 static int
01556 rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th)
01557 {
01558 return RARRAY_LEN(th->pending_interrupt_queue) == 0;
01559 }
01560
01561 static int
01562 rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err)
01563 {
01564 int i;
01565 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
01566 VALUE e = RARRAY_PTR(th->pending_interrupt_queue)[i];
01567 if (rb_class_inherited_p(e, err)) {
01568 return TRUE;
01569 }
01570 }
01571 return FALSE;
01572 }
01573
01574 static VALUE
01575 rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing)
01576 {
01577 #if 1
01578 int i;
01579
01580 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
01581 VALUE err = RARRAY_PTR(th->pending_interrupt_queue)[i];
01582
01583 enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
01584
01585 switch (mask_timing) {
01586 case INTERRUPT_ON_BLOCKING:
01587 if (timing != INTERRUPT_ON_BLOCKING) {
01588 break;
01589 }
01590
01591 case INTERRUPT_NONE:
01592 case INTERRUPT_IMMEDIATE:
01593 rb_ary_delete_at(th->pending_interrupt_queue, i);
01594 return err;
01595 case INTERRUPT_NEVER:
01596 break;
01597 }
01598 }
01599
01600 th->pending_interrupt_queue_checked = 1;
01601 return Qundef;
01602 #else
01603 VALUE err = rb_ary_shift(th->pending_interrupt_queue);
01604 if (rb_threadptr_pending_interrupt_empty_p(th)) {
01605 th->pending_interrupt_queue_checked = 1;
01606 }
01607 return err;
01608 #endif
01609 }
01610
01611 int
01612 rb_threadptr_pending_interrupt_active_p(rb_thread_t *th)
01613 {
01614
01615
01616
01617
01618
01619 if (th->pending_interrupt_queue_checked) {
01620 return 0;
01621 }
01622
01623 if (rb_threadptr_pending_interrupt_empty_p(th)) {
01624 return 0;
01625 }
01626
01627 return 1;
01628 }
01629
01630 static int
01631 handle_interrupt_arg_check_i(VALUE key, VALUE val)
01632 {
01633 if (val != sym_immediate && val != sym_on_blocking && val != sym_never) {
01634 rb_raise(rb_eArgError, "unknown mask signature");
01635 }
01636
01637 return ST_CONTINUE;
01638 }
01639
01640
01641
01642
01643
01644
01645
01646
01647
01648
01649
01650
01651
01652
01653
01654
01655
01656
01657
01658
01659
01660
01661
01662
01663
01664
01665
01666
01667
01668
01669
01670
01671
01672
01673
01674
01675
01676
01677
01678
01679
01680
01681
01682
01683
01684
01685
01686
01687
01688
01689
01690
01691
01692
01693
01694
01695
01696
01697
01698
01699
01700
01701
01702
01703
01704
01705
01706
01707
01708
01709
01710
01711
01712
01713
01714
01715
01716
01717
01718
01719
01720
01721
01722
01723
01724
01725
01726
01727
01728
01729
01730
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741
01742
01743
01744
01745
01746 static VALUE
01747 rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
01748 {
01749 VALUE mask;
01750 rb_thread_t *th = GET_THREAD();
01751 VALUE r = Qnil;
01752 int state;
01753
01754 if (!rb_block_given_p()) {
01755 rb_raise(rb_eArgError, "block is needed.");
01756 }
01757
01758 mask = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash");
01759 rb_hash_foreach(mask, handle_interrupt_arg_check_i, 0);
01760 rb_ary_push(th->pending_interrupt_mask_stack, mask);
01761 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
01762 th->pending_interrupt_queue_checked = 0;
01763 RUBY_VM_SET_INTERRUPT(th);
01764 }
01765
01766 TH_PUSH_TAG(th);
01767 if ((state = EXEC_TAG()) == 0) {
01768 r = rb_yield(Qnil);
01769 }
01770 TH_POP_TAG();
01771
01772 rb_ary_pop(th->pending_interrupt_mask_stack);
01773 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
01774 th->pending_interrupt_queue_checked = 0;
01775 RUBY_VM_SET_INTERRUPT(th);
01776 }
01777
01778 RUBY_VM_CHECK_INTS(th);
01779
01780 if (state) {
01781 JUMP_TAG(state);
01782 }
01783
01784 return r;
01785 }
01786
01787
01788
01789
01790
01791
01792
01793
01794
01795
01796
01797 static VALUE
01798 rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
01799 {
01800 rb_thread_t *target_th;
01801
01802 GetThreadPtr(target_thread, target_th);
01803
01804 if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
01805 return Qfalse;
01806 }
01807 else {
01808 if (argc == 1) {
01809 VALUE err;
01810 rb_scan_args(argc, argv, "01", &err);
01811 if (!rb_obj_is_kind_of(err, rb_cModule)) {
01812 rb_raise(rb_eTypeError, "class or module required for rescue clause");
01813 }
01814 if (rb_threadptr_pending_interrupt_include_p(target_th, err)) {
01815 return Qtrue;
01816 }
01817 else {
01818 return Qfalse;
01819 }
01820 }
01821 return Qtrue;
01822 }
01823 }
01824
01825
01826
01827
01828
01829
01830
01831
01832
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849
01850
01851
01852
01853
01854
01855
01856
01857
01858
01859
01860
01861
01862
01863
01864
01865
01866
01867
01868
01869
01870
01871
01872
01873
01874
01875
01876
01877
01878
01879
01880
01881
01882 static VALUE
01883 rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
01884 {
01885 return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
01886 }
01887
01888 static void
01889 rb_threadptr_to_kill(rb_thread_t *th)
01890 {
01891 rb_threadptr_pending_interrupt_clear(th);
01892 th->status = THREAD_RUNNABLE;
01893 th->to_kill = 1;
01894 th->errinfo = INT2FIX(TAG_FATAL);
01895 TH_JUMP_TAG(th, TAG_FATAL);
01896 }
01897
01898 void
01899 rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
01900 {
01901 if (th->raised_flag) return;
01902
01903 while (1) {
01904 rb_atomic_t interrupt;
01905 rb_atomic_t old;
01906 int sig;
01907 int timer_interrupt;
01908 int pending_interrupt;
01909 int finalizer_interrupt;
01910 int trap_interrupt;
01911
01912 do {
01913 interrupt = th->interrupt_flag;
01914 old = ATOMIC_CAS(th->interrupt_flag, interrupt, interrupt & th->interrupt_mask);
01915 } while (old != interrupt);
01916
01917 interrupt &= (rb_atomic_t)~th->interrupt_mask;
01918 if (!interrupt)
01919 return;
01920
01921 timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
01922 pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
01923 finalizer_interrupt = interrupt & FINALIZER_INTERRUPT_MASK;
01924 trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
01925
01926
01927 if (trap_interrupt && (th == th->vm->main_thread)) {
01928 enum rb_thread_status prev_status = th->status;
01929 th->status = THREAD_RUNNABLE;
01930 while ((sig = rb_get_next_signal()) != 0) {
01931 rb_signal_exec(th, sig);
01932 }
01933 th->status = prev_status;
01934 }
01935
01936
01937 if (pending_interrupt && rb_threadptr_pending_interrupt_active_p(th)) {
01938 VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
01939 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
01940
01941 if (err == Qundef) {
01942
01943 }
01944 else if (err == eKillSignal ||
01945 err == eTerminateSignal ||
01946 err == INT2FIX(TAG_FATAL) ) {
01947 rb_threadptr_to_kill(th);
01948 }
01949 else {
01950
01951 if (th->status == THREAD_STOPPED ||
01952 th->status == THREAD_STOPPED_FOREVER)
01953 th->status = THREAD_RUNNABLE;
01954 rb_exc_raise(err);
01955 }
01956 }
01957
01958 if (finalizer_interrupt) {
01959 rb_gc_finalize_deferred();
01960 }
01961
01962 if (timer_interrupt) {
01963 unsigned long limits_us = TIME_QUANTUM_USEC;
01964
01965 if (th->priority > 0)
01966 limits_us <<= th->priority;
01967 else
01968 limits_us >>= -th->priority;
01969
01970 if (th->status == THREAD_RUNNABLE)
01971 th->running_time_us += TIME_QUANTUM_USEC;
01972
01973 EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0, Qundef);
01974
01975 rb_thread_schedule_limits(limits_us);
01976 }
01977 }
01978 }
01979
01980 void
01981 rb_thread_execute_interrupts(VALUE thval)
01982 {
01983 rb_thread_t *th;
01984 GetThreadPtr(thval, th);
01985 rb_threadptr_execute_interrupts(th, 1);
01986 }
01987
01988 static void
01989 rb_threadptr_ready(rb_thread_t *th)
01990 {
01991 rb_threadptr_interrupt(th);
01992 }
01993
01994 static VALUE
01995 rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
01996 {
01997 VALUE exc;
01998
01999 if (rb_threadptr_dead(th)) {
02000 return Qnil;
02001 }
02002
02003 if (argc == 0) {
02004 exc = rb_exc_new(rb_eRuntimeError, 0, 0);
02005 }
02006 else {
02007 exc = rb_make_exception(argc, argv);
02008 }
02009 rb_threadptr_pending_interrupt_enque(th, exc);
02010 rb_threadptr_interrupt(th);
02011 return Qnil;
02012 }
02013
02014 void
02015 rb_threadptr_signal_raise(rb_thread_t *th, int sig)
02016 {
02017 VALUE argv[2];
02018
02019 argv[0] = rb_eSignal;
02020 argv[1] = INT2FIX(sig);
02021 rb_threadptr_raise(th->vm->main_thread, 2, argv);
02022 }
02023
02024 void
02025 rb_threadptr_signal_exit(rb_thread_t *th)
02026 {
02027 VALUE argv[2];
02028
02029 argv[0] = rb_eSystemExit;
02030 argv[1] = rb_str_new2("exit");
02031 rb_threadptr_raise(th->vm->main_thread, 2, argv);
02032 }
02033
02034 #if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK)
02035 #define USE_SIGALTSTACK
02036 #endif
02037
02038 void
02039 ruby_thread_stack_overflow(rb_thread_t *th)
02040 {
02041 th->raised_flag = 0;
02042 #ifdef USE_SIGALTSTACK
02043 rb_exc_raise(sysstack_error);
02044 #else
02045 th->errinfo = sysstack_error;
02046 TH_JUMP_TAG(th, TAG_RAISE);
02047 #endif
02048 }
02049
02050 int
02051 rb_threadptr_set_raised(rb_thread_t *th)
02052 {
02053 if (th->raised_flag & RAISED_EXCEPTION) {
02054 return 1;
02055 }
02056 th->raised_flag |= RAISED_EXCEPTION;
02057 return 0;
02058 }
02059
02060 int
02061 rb_threadptr_reset_raised(rb_thread_t *th)
02062 {
02063 if (!(th->raised_flag & RAISED_EXCEPTION)) {
02064 return 0;
02065 }
02066 th->raised_flag &= ~RAISED_EXCEPTION;
02067 return 1;
02068 }
02069
02070 static int
02071 thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
02072 {
02073 int fd = (int)data;
02074 rb_thread_t *th;
02075 GetThreadPtr((VALUE)key, th);
02076
02077 if (th->waiting_fd == fd) {
02078 VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
02079 rb_threadptr_pending_interrupt_enque(th, err);
02080 rb_threadptr_interrupt(th);
02081 }
02082 return ST_CONTINUE;
02083 }
02084
02085 void
02086 rb_thread_fd_close(int fd)
02087 {
02088 st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd);
02089 }
02090
02091
02092
02093
02094
02095
02096
02097
02098
02099
02100
02101
02102
02103
02104
02105
02106
02107
02108
02109
02110
02111
02112 static VALUE
02113 thread_raise_m(int argc, VALUE *argv, VALUE self)
02114 {
02115 rb_thread_t *target_th;
02116 rb_thread_t *th = GET_THREAD();
02117 GetThreadPtr(self, target_th);
02118 rb_threadptr_raise(target_th, argc, argv);
02119
02120
02121 if (th == target_th) {
02122 RUBY_VM_CHECK_INTS(th);
02123 }
02124 return Qnil;
02125 }
02126
02127
02128
02129
02130
02131
02132
02133
02134
02135
02136
02137
02138
02139
02140 VALUE
02141 rb_thread_kill(VALUE thread)
02142 {
02143 rb_thread_t *th;
02144
02145 GetThreadPtr(thread, th);
02146
02147 if (th != GET_THREAD() && th->safe_level < 4) {
02148 rb_secure(4);
02149 }
02150 if (th->to_kill || th->status == THREAD_KILLED) {
02151 return thread;
02152 }
02153 if (th == th->vm->main_thread) {
02154 rb_exit(EXIT_SUCCESS);
02155 }
02156
02157 thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
02158
02159 if (th == GET_THREAD()) {
02160
02161 rb_threadptr_to_kill(th);
02162 }
02163 else {
02164 rb_threadptr_pending_interrupt_enque(th, eKillSignal);
02165 rb_threadptr_interrupt(th);
02166 }
02167 return thread;
02168 }
02169
02170
02171
02172
02173
02174
02175
02176
02177
02178
02179
02180
02181
02182
02183
02184
02185 static VALUE
02186 rb_thread_s_kill(VALUE obj, VALUE th)
02187 {
02188 return rb_thread_kill(th);
02189 }
02190
02191
02192
02193
02194
02195
02196
02197
02198
02199
02200
02201
02202 static VALUE
02203 rb_thread_exit(void)
02204 {
02205 rb_thread_t *th = GET_THREAD();
02206 return rb_thread_kill(th->self);
02207 }
02208
02209
02210
02211
02212
02213
02214
02215
02216
02217
02218
02219
02220
02221
02222
02223
02224
02225
02226
02227 VALUE
02228 rb_thread_wakeup(VALUE thread)
02229 {
02230 if (!RTEST(rb_thread_wakeup_alive(thread))) {
02231 rb_raise(rb_eThreadError, "killed thread");
02232 }
02233 return thread;
02234 }
02235
02236 VALUE
02237 rb_thread_wakeup_alive(VALUE thread)
02238 {
02239 rb_thread_t *th;
02240 GetThreadPtr(thread, th);
02241
02242 if (th->status == THREAD_KILLED) {
02243 return Qnil;
02244 }
02245 rb_threadptr_ready(th);
02246 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
02247 th->status = THREAD_RUNNABLE;
02248 return thread;
02249 }
02250
02251
02252
02253
02254
02255
02256
02257
02258
02259
02260
02261
02262
02263
02264
02265
02266
02267
02268
02269
02270
02271 VALUE
02272 rb_thread_run(VALUE thread)
02273 {
02274 rb_thread_wakeup(thread);
02275 rb_thread_schedule();
02276 return thread;
02277 }
02278
02279
02280
02281
02282
02283
02284
02285
02286
02287
02288
02289
02290
02291
02292
02293
02294
02295
02296
02297
02298 VALUE
02299 rb_thread_stop(void)
02300 {
02301 if (rb_thread_alone()) {
02302 rb_raise(rb_eThreadError,
02303 "stopping only thread\n\tnote: use sleep to stop forever");
02304 }
02305 rb_thread_sleep_deadly();
02306 return Qnil;
02307 }
02308
02309 static int
02310 thread_list_i(st_data_t key, st_data_t val, void *data)
02311 {
02312 VALUE ary = (VALUE)data;
02313 rb_thread_t *th;
02314 GetThreadPtr((VALUE)key, th);
02315
02316 switch (th->status) {
02317 case THREAD_RUNNABLE:
02318 case THREAD_STOPPED:
02319 case THREAD_STOPPED_FOREVER:
02320 rb_ary_push(ary, th->self);
02321 default:
02322 break;
02323 }
02324 return ST_CONTINUE;
02325 }
02326
02327
02328
02329
02330
02331
02332
02333
02334
02335
02336
02337
02338
02339
02340
02341
02342
02343
02344
02345
02346
02347
02348
02349 VALUE
02350 rb_thread_list(void)
02351 {
02352 VALUE ary = rb_ary_new();
02353 st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary);
02354 return ary;
02355 }
02356
02357 VALUE
02358 rb_thread_current(void)
02359 {
02360 return GET_THREAD()->self;
02361 }
02362
02363
02364
02365
02366
02367
02368
02369
02370
02371
02372 static VALUE
02373 thread_s_current(VALUE klass)
02374 {
02375 return rb_thread_current();
02376 }
02377
02378 VALUE
02379 rb_thread_main(void)
02380 {
02381 return GET_THREAD()->vm->main_thread->self;
02382 }
02383
02384
02385
02386
02387
02388
02389
02390
02391 static VALUE
02392 rb_thread_s_main(VALUE klass)
02393 {
02394 return rb_thread_main();
02395 }
02396
02397
02398
02399
02400
02401
02402
02403
02404
02405
02406
02407
02408
02409
02410 static VALUE
02411 rb_thread_s_abort_exc(void)
02412 {
02413 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse;
02414 }
02415
02416
02417
02418
02419
02420
02421
02422
02423
02424
02425
02426
02427
02428
02429
02430
02431
02432
02433
02434
02435
02436
02437
02438
02439
02440
02441 static VALUE
02442 rb_thread_s_abort_exc_set(VALUE self, VALUE val)
02443 {
02444 rb_secure(4);
02445 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val);
02446 return val;
02447 }
02448
02449
02450
02451
02452
02453
02454
02455
02456
02457
02458
02459 static VALUE
02460 rb_thread_abort_exc(VALUE thread)
02461 {
02462 rb_thread_t *th;
02463 GetThreadPtr(thread, th);
02464 return th->abort_on_exception ? Qtrue : Qfalse;
02465 }
02466
02467
02468
02469
02470
02471
02472
02473
02474
02475
02476
02477 static VALUE
02478 rb_thread_abort_exc_set(VALUE thread, VALUE val)
02479 {
02480 rb_thread_t *th;
02481 rb_secure(4);
02482
02483 GetThreadPtr(thread, th);
02484 th->abort_on_exception = RTEST(val);
02485 return val;
02486 }
02487
02488
02489
02490
02491
02492
02493
02494
02495
02496
02497
02498
02499 VALUE
02500 rb_thread_group(VALUE thread)
02501 {
02502 rb_thread_t *th;
02503 VALUE group;
02504 GetThreadPtr(thread, th);
02505 group = th->thgroup;
02506
02507 if (!group) {
02508 group = Qnil;
02509 }
02510 return group;
02511 }
02512
02513 static const char *
02514 thread_status_name(rb_thread_t *th)
02515 {
02516 switch (th->status) {
02517 case THREAD_RUNNABLE:
02518 if (th->to_kill)
02519 return "aborting";
02520 else
02521 return "run";
02522 case THREAD_STOPPED:
02523 case THREAD_STOPPED_FOREVER:
02524 return "sleep";
02525 case THREAD_KILLED:
02526 return "dead";
02527 default:
02528 return "unknown";
02529 }
02530 }
02531
02532 static int
02533 rb_threadptr_dead(rb_thread_t *th)
02534 {
02535 return th->status == THREAD_KILLED;
02536 }
02537
02538
02539
02540
02541
02542
02543
02544
02545
02546
02547
02548
02549
02550
02551
02552
02553
02554
02555
02556
02557
02558
02559
02560
02561 static VALUE
02562 rb_thread_status(VALUE thread)
02563 {
02564 rb_thread_t *th;
02565 GetThreadPtr(thread, th);
02566
02567 if (rb_threadptr_dead(th)) {
02568 if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo)
02569 ) {
02570 return Qnil;
02571 }
02572 return Qfalse;
02573 }
02574 return rb_str_new2(thread_status_name(th));
02575 }
02576
02577
02578
02579
02580
02581
02582
02583
02584
02585
02586
02587
02588
02589
02590 static VALUE
02591 rb_thread_alive_p(VALUE thread)
02592 {
02593 rb_thread_t *th;
02594 GetThreadPtr(thread, th);
02595
02596 if (rb_threadptr_dead(th))
02597 return Qfalse;
02598 return Qtrue;
02599 }
02600
02601
02602
02603
02604
02605
02606
02607
02608
02609
02610
02611
02612
02613 static VALUE
02614 rb_thread_stop_p(VALUE thread)
02615 {
02616 rb_thread_t *th;
02617 GetThreadPtr(thread, th);
02618
02619 if (rb_threadptr_dead(th))
02620 return Qtrue;
02621 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
02622 return Qtrue;
02623 return Qfalse;
02624 }
02625
02626
02627
02628
02629
02630
02631
02632
02633
02634
02635
02636
02637
02638 static VALUE
02639 rb_thread_safe_level(VALUE thread)
02640 {
02641 rb_thread_t *th;
02642 GetThreadPtr(thread, th);
02643
02644 return INT2NUM(th->safe_level);
02645 }
02646
02647
02648
02649
02650
02651
02652
02653
02654 static VALUE
02655 rb_thread_inspect(VALUE thread)
02656 {
02657 const char *cname = rb_obj_classname(thread);
02658 rb_thread_t *th;
02659 const char *status;
02660 VALUE str;
02661
02662 GetThreadPtr(thread, th);
02663 status = thread_status_name(th);
02664 str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status);
02665 OBJ_INFECT(str, thread);
02666
02667 return str;
02668 }
02669
02670 VALUE
02671 rb_thread_local_aref(VALUE thread, ID id)
02672 {
02673 rb_thread_t *th;
02674 st_data_t val;
02675
02676 GetThreadPtr(thread, th);
02677 if (rb_safe_level() >= 4 && th != GET_THREAD()) {
02678 rb_raise(rb_eSecurityError, "Insecure: thread locals");
02679 }
02680 if (!th->local_storage) {
02681 return Qnil;
02682 }
02683 if (st_lookup(th->local_storage, id, &val)) {
02684 return (VALUE)val;
02685 }
02686 return Qnil;
02687 }
02688
02689
02690
02691
02692
02693
02694
02695
02696
02697
02698
02699
02700
02701
02702
02703
02704
02705
02706
02707
02708
02709
02710
02711
02712
02713
02714
02715
02716
02717
02718
02719
02720
02721
02722
02723
02724
02725
02726
02727
02728
02729
02730
02731
02732
02733
02734
02735
02736
02737
02738
02739
02740
02741
02742
02743
02744
02745
02746
02747
02748
02749 static VALUE
02750 rb_thread_aref(VALUE thread, VALUE id)
02751 {
02752 return rb_thread_local_aref(thread, rb_to_id(id));
02753 }
02754
02755 VALUE
02756 rb_thread_local_aset(VALUE thread, ID id, VALUE val)
02757 {
02758 rb_thread_t *th;
02759 GetThreadPtr(thread, th);
02760
02761 if (rb_safe_level() >= 4 && th != GET_THREAD()) {
02762 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals");
02763 }
02764 if (OBJ_FROZEN(thread)) {
02765 rb_error_frozen("thread locals");
02766 }
02767 if (!th->local_storage) {
02768 th->local_storage = st_init_numtable();
02769 }
02770 if (NIL_P(val)) {
02771 st_delete_wrap(th->local_storage, id);
02772 return Qnil;
02773 }
02774 st_insert(th->local_storage, id, val);
02775 return val;
02776 }
02777
02778
02779
02780
02781
02782
02783
02784
02785
02786
02787
02788 static VALUE
02789 rb_thread_aset(VALUE self, VALUE id, VALUE val)
02790 {
02791 return rb_thread_local_aset(self, rb_to_id(id), val);
02792 }
02793
02794
02795
02796
02797
02798
02799
02800
02801
02802
02803
02804
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815
02816
02817
02818
02819
02820
02821
02822
02823
02824 static VALUE
02825 rb_thread_variable_get(VALUE thread, VALUE id)
02826 {
02827 VALUE locals;
02828 rb_thread_t *th;
02829
02830 GetThreadPtr(thread, th);
02831
02832 if (rb_safe_level() >= 4 && th != GET_THREAD()) {
02833 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals");
02834 }
02835
02836 locals = rb_iv_get(thread, "locals");
02837 return rb_hash_aref(locals, ID2SYM(rb_to_id(id)));
02838 }
02839
02840
02841
02842
02843
02844
02845
02846
02847
02848
02849 static VALUE
02850 rb_thread_variable_set(VALUE thread, VALUE id, VALUE val)
02851 {
02852 VALUE locals;
02853 rb_thread_t *th;
02854
02855 GetThreadPtr(thread, th);
02856
02857 if (rb_safe_level() >= 4 && th != GET_THREAD()) {
02858 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals");
02859 }
02860 if (OBJ_FROZEN(thread)) {
02861 rb_error_frozen("thread locals");
02862 }
02863
02864 locals = rb_iv_get(thread, "locals");
02865 return rb_hash_aset(locals, ID2SYM(rb_to_id(id)), val);
02866 }
02867
02868
02869
02870
02871
02872
02873
02874
02875
02876
02877
02878
02879
02880
02881 static VALUE
02882 rb_thread_key_p(VALUE self, VALUE key)
02883 {
02884 rb_thread_t *th;
02885 ID id = rb_to_id(key);
02886
02887 GetThreadPtr(self, th);
02888
02889 if (!th->local_storage) {
02890 return Qfalse;
02891 }
02892 if (st_lookup(th->local_storage, id, 0)) {
02893 return Qtrue;
02894 }
02895 return Qfalse;
02896 }
02897
02898 static int
02899 thread_keys_i(ID key, VALUE value, VALUE ary)
02900 {
02901 rb_ary_push(ary, ID2SYM(key));
02902 return ST_CONTINUE;
02903 }
02904
02905 static int
02906 vm_living_thread_num(rb_vm_t *vm)
02907 {
02908 return (int)vm->living_threads->num_entries;
02909 }
02910
02911 int
02912 rb_thread_alone(void)
02913 {
02914 int num = 1;
02915 if (GET_THREAD()->vm->living_threads) {
02916 num = vm_living_thread_num(GET_THREAD()->vm);
02917 thread_debug("rb_thread_alone: %d\n", num);
02918 }
02919 return num == 1;
02920 }
02921
02922
02923
02924
02925
02926
02927
02928
02929
02930
02931
02932
02933
02934
02935
02936 static VALUE
02937 rb_thread_keys(VALUE self)
02938 {
02939 rb_thread_t *th;
02940 VALUE ary = rb_ary_new();
02941 GetThreadPtr(self, th);
02942
02943 if (th->local_storage) {
02944 st_foreach(th->local_storage, thread_keys_i, ary);
02945 }
02946 return ary;
02947 }
02948
02949 static int
02950 keys_i(VALUE key, VALUE value, VALUE ary)
02951 {
02952 rb_ary_push(ary, key);
02953 return ST_CONTINUE;
02954 }
02955
02956
02957
02958
02959
02960
02961
02962
02963
02964
02965
02966
02967
02968
02969
02970
02971
02972
02973 static VALUE
02974 rb_thread_variables(VALUE thread)
02975 {
02976 VALUE locals;
02977 VALUE ary;
02978
02979 locals = rb_iv_get(thread, "locals");
02980 ary = rb_ary_new();
02981 rb_hash_foreach(locals, keys_i, ary);
02982
02983 return ary;
02984 }
02985
02986
02987
02988
02989
02990
02991
02992
02993
02994
02995
02996
02997
02998
02999
03000
03001
03002 static VALUE
03003 rb_thread_variable_p(VALUE thread, VALUE key)
03004 {
03005 VALUE locals;
03006
03007 locals = rb_iv_get(thread, "locals");
03008
03009 if (!RHASH(locals)->ntbl)
03010 return Qfalse;
03011
03012 if (st_lookup(RHASH(locals)->ntbl, ID2SYM(rb_to_id(key)), 0)) {
03013 return Qtrue;
03014 }
03015
03016 return Qfalse;
03017 }
03018
03019
03020
03021
03022
03023
03024
03025
03026
03027
03028
03029
03030
03031
03032
03033
03034 static VALUE
03035 rb_thread_priority(VALUE thread)
03036 {
03037 rb_thread_t *th;
03038 GetThreadPtr(thread, th);
03039 return INT2NUM(th->priority);
03040 }
03041
03042
03043
03044
03045
03046
03047
03048
03049
03050
03051
03052
03053
03054
03055
03056
03057
03058
03059
03060
03061
03062
03063
03064
03065
03066
03067
03068
03069 static VALUE
03070 rb_thread_priority_set(VALUE thread, VALUE prio)
03071 {
03072 rb_thread_t *th;
03073 int priority;
03074 GetThreadPtr(thread, th);
03075
03076 rb_secure(4);
03077
03078 #if USE_NATIVE_THREAD_PRIORITY
03079 th->priority = NUM2INT(prio);
03080 native_thread_apply_priority(th);
03081 #else
03082 priority = NUM2INT(prio);
03083 if (priority > RUBY_THREAD_PRIORITY_MAX) {
03084 priority = RUBY_THREAD_PRIORITY_MAX;
03085 }
03086 else if (priority < RUBY_THREAD_PRIORITY_MIN) {
03087 priority = RUBY_THREAD_PRIORITY_MIN;
03088 }
03089 th->priority = priority;
03090 #endif
03091 return INT2NUM(th->priority);
03092 }
03093
03094
03095
03096 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT)
03097
03098
03099
03100
03101
03102
03103
03104
03105
03106
03107
03108
03109
03110
03111
03112
03113
03114
03115
03116
03117
03118
03119
03120
03121
03122
03123
03124
03125
03126
03127
03128
03129 void
03130 rb_fd_init(rb_fdset_t *fds)
03131 {
03132 fds->maxfd = 0;
03133 fds->fdset = ALLOC(fd_set);
03134 FD_ZERO(fds->fdset);
03135 }
03136
03137 void
03138 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
03139 {
03140 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
03141
03142 if (size < sizeof(fd_set))
03143 size = sizeof(fd_set);
03144 dst->maxfd = src->maxfd;
03145 dst->fdset = xmalloc(size);
03146 memcpy(dst->fdset, src->fdset, size);
03147 }
03148
03149 void
03150 rb_fd_term(rb_fdset_t *fds)
03151 {
03152 if (fds->fdset) xfree(fds->fdset);
03153 fds->maxfd = 0;
03154 fds->fdset = 0;
03155 }
03156
03157 void
03158 rb_fd_zero(rb_fdset_t *fds)
03159 {
03160 if (fds->fdset)
03161 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
03162 }
03163
03164 static void
03165 rb_fd_resize(int n, rb_fdset_t *fds)
03166 {
03167 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask);
03168 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask);
03169
03170 if (m < sizeof(fd_set)) m = sizeof(fd_set);
03171 if (o < sizeof(fd_set)) o = sizeof(fd_set);
03172
03173 if (m > o) {
03174 fds->fdset = xrealloc(fds->fdset, m);
03175 memset((char *)fds->fdset + o, 0, m - o);
03176 }
03177 if (n >= fds->maxfd) fds->maxfd = n + 1;
03178 }
03179
03180 void
03181 rb_fd_set(int n, rb_fdset_t *fds)
03182 {
03183 rb_fd_resize(n, fds);
03184 FD_SET(n, fds->fdset);
03185 }
03186
03187 void
03188 rb_fd_clr(int n, rb_fdset_t *fds)
03189 {
03190 if (n >= fds->maxfd) return;
03191 FD_CLR(n, fds->fdset);
03192 }
03193
03194 int
03195 rb_fd_isset(int n, const rb_fdset_t *fds)
03196 {
03197 if (n >= fds->maxfd) return 0;
03198 return FD_ISSET(n, fds->fdset) != 0;
03199 }
03200
03201 void
03202 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max)
03203 {
03204 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask);
03205
03206 if (size < sizeof(fd_set)) size = sizeof(fd_set);
03207 dst->maxfd = max;
03208 dst->fdset = xrealloc(dst->fdset, size);
03209 memcpy(dst->fdset, src, size);
03210 }
03211
03212 static void
03213 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src)
03214 {
03215 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
03216
03217 if (size > sizeof(fd_set)) {
03218 rb_raise(rb_eArgError, "too large fdsets");
03219 }
03220 memcpy(dst, rb_fd_ptr(src), sizeof(fd_set));
03221 }
03222
03223 void
03224 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
03225 {
03226 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
03227
03228 if (size < sizeof(fd_set))
03229 size = sizeof(fd_set);
03230 dst->maxfd = src->maxfd;
03231 dst->fdset = xrealloc(dst->fdset, size);
03232 memcpy(dst->fdset, src->fdset, size);
03233 }
03234
03235 #ifdef __native_client__
03236 int select(int nfds, fd_set *readfds, fd_set *writefds,
03237 fd_set *exceptfds, struct timeval *timeout);
03238 #endif
03239
03240 int
03241 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
03242 {
03243 fd_set *r = NULL, *w = NULL, *e = NULL;
03244 if (readfds) {
03245 rb_fd_resize(n - 1, readfds);
03246 r = rb_fd_ptr(readfds);
03247 }
03248 if (writefds) {
03249 rb_fd_resize(n - 1, writefds);
03250 w = rb_fd_ptr(writefds);
03251 }
03252 if (exceptfds) {
03253 rb_fd_resize(n - 1, exceptfds);
03254 e = rb_fd_ptr(exceptfds);
03255 }
03256 return select(n, r, w, e, timeout);
03257 }
03258
03259 #undef FD_ZERO
03260 #undef FD_SET
03261 #undef FD_CLR
03262 #undef FD_ISSET
03263
03264 #define FD_ZERO(f) rb_fd_zero(f)
03265 #define FD_SET(i, f) rb_fd_set((i), (f))
03266 #define FD_CLR(i, f) rb_fd_clr((i), (f))
03267 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
03268
03269 #elif defined(_WIN32)
03270
03271 void
03272 rb_fd_init(rb_fdset_t *set)
03273 {
03274 set->capa = FD_SETSIZE;
03275 set->fdset = ALLOC(fd_set);
03276 FD_ZERO(set->fdset);
03277 }
03278
03279 void
03280 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
03281 {
03282 rb_fd_init(dst);
03283 rb_fd_dup(dst, src);
03284 }
03285
03286 static void
03287 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src)
03288 {
03289 int max = rb_fd_max(src);
03290
03291
03292
03293 if (max > FD_SETSIZE || (UINT)max > dst->fd_count) {
03294 rb_raise(rb_eArgError, "too large fdsets");
03295 }
03296
03297 memcpy(dst->fd_array, src->fdset->fd_array, max);
03298 dst->fd_count = max;
03299 }
03300
03301 void
03302 rb_fd_term(rb_fdset_t *set)
03303 {
03304 xfree(set->fdset);
03305 set->fdset = NULL;
03306 set->capa = 0;
03307 }
03308
03309 void
03310 rb_fd_set(int fd, rb_fdset_t *set)
03311 {
03312 unsigned int i;
03313 SOCKET s = rb_w32_get_osfhandle(fd);
03314
03315 for (i = 0; i < set->fdset->fd_count; i++) {
03316 if (set->fdset->fd_array[i] == s) {
03317 return;
03318 }
03319 }
03320 if (set->fdset->fd_count >= (unsigned)set->capa) {
03321 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
03322 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa);
03323 }
03324 set->fdset->fd_array[set->fdset->fd_count++] = s;
03325 }
03326
03327 #undef FD_ZERO
03328 #undef FD_SET
03329 #undef FD_CLR
03330 #undef FD_ISSET
03331
03332 #define FD_ZERO(f) rb_fd_zero(f)
03333 #define FD_SET(i, f) rb_fd_set((i), (f))
03334 #define FD_CLR(i, f) rb_fd_clr((i), (f))
03335 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
03336
03337 #else
03338 #define rb_fd_rcopy(d, s) (*(d) = *(s))
03339 #endif
03340
03341 static int
03342 do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
03343 struct timeval *timeout)
03344 {
03345 int UNINITIALIZED_VAR(result);
03346 int lerrno;
03347 rb_fdset_t UNINITIALIZED_VAR(orig_read);
03348 rb_fdset_t UNINITIALIZED_VAR(orig_write);
03349 rb_fdset_t UNINITIALIZED_VAR(orig_except);
03350 double limit = 0;
03351 struct timeval wait_rest;
03352 rb_thread_t *th = GET_THREAD();
03353
03354 if (timeout) {
03355 limit = timeofday();
03356 limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6;
03357 wait_rest = *timeout;
03358 timeout = &wait_rest;
03359 }
03360
03361 if (read)
03362 rb_fd_init_copy(&orig_read, read);
03363 if (write)
03364 rb_fd_init_copy(&orig_write, write);
03365 if (except)
03366 rb_fd_init_copy(&orig_except, except);
03367
03368 retry:
03369 lerrno = 0;
03370
03371 BLOCKING_REGION({
03372 result = native_fd_select(n, read, write, except, timeout, th);
03373 if (result < 0) lerrno = errno;
03374 }, ubf_select, th, FALSE);
03375
03376 RUBY_VM_CHECK_INTS_BLOCKING(th);
03377
03378 errno = lerrno;
03379
03380 if (result < 0) {
03381 switch (errno) {
03382 case EINTR:
03383 #ifdef ERESTART
03384 case ERESTART:
03385 #endif
03386 if (read)
03387 rb_fd_dup(read, &orig_read);
03388 if (write)
03389 rb_fd_dup(write, &orig_write);
03390 if (except)
03391 rb_fd_dup(except, &orig_except);
03392
03393 if (timeout) {
03394 double d = limit - timeofday();
03395
03396 wait_rest.tv_sec = (time_t)d;
03397 wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6);
03398 if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0;
03399 if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0;
03400 }
03401
03402 goto retry;
03403 default:
03404 break;
03405 }
03406 }
03407
03408 if (read)
03409 rb_fd_term(&orig_read);
03410 if (write)
03411 rb_fd_term(&orig_write);
03412 if (except)
03413 rb_fd_term(&orig_except);
03414
03415 return result;
03416 }
03417
03418 static void
03419 rb_thread_wait_fd_rw(int fd, int read)
03420 {
03421 int result = 0;
03422 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT;
03423
03424 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write");
03425
03426 if (fd < 0) {
03427 rb_raise(rb_eIOError, "closed stream");
03428 }
03429
03430 result = rb_wait_for_single_fd(fd, events, NULL);
03431 if (result < 0) {
03432 rb_sys_fail(0);
03433 }
03434
03435 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write");
03436 }
03437
03438 void
03439 rb_thread_wait_fd(int fd)
03440 {
03441 rb_thread_wait_fd_rw(fd, 1);
03442 }
03443
03444 int
03445 rb_thread_fd_writable(int fd)
03446 {
03447 rb_thread_wait_fd_rw(fd, 0);
03448 return TRUE;
03449 }
03450
03451 int
03452 rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except,
03453 struct timeval *timeout)
03454 {
03455 rb_fdset_t fdsets[3];
03456 rb_fdset_t *rfds = NULL;
03457 rb_fdset_t *wfds = NULL;
03458 rb_fdset_t *efds = NULL;
03459 int retval;
03460
03461 if (read) {
03462 rfds = &fdsets[0];
03463 rb_fd_init(rfds);
03464 rb_fd_copy(rfds, read, max);
03465 }
03466 if (write) {
03467 wfds = &fdsets[1];
03468 rb_fd_init(wfds);
03469 rb_fd_copy(wfds, write, max);
03470 }
03471 if (except) {
03472 efds = &fdsets[2];
03473 rb_fd_init(efds);
03474 rb_fd_copy(efds, except, max);
03475 }
03476
03477 retval = rb_thread_fd_select(max, rfds, wfds, efds, timeout);
03478
03479 if (rfds) {
03480 rb_fd_rcopy(read, rfds);
03481 rb_fd_term(rfds);
03482 }
03483 if (wfds) {
03484 rb_fd_rcopy(write, wfds);
03485 rb_fd_term(wfds);
03486 }
03487 if (efds) {
03488 rb_fd_rcopy(except, efds);
03489 rb_fd_term(efds);
03490 }
03491
03492 return retval;
03493 }
03494
03495 int
03496 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
03497 struct timeval *timeout)
03498 {
03499 if (!read && !write && !except) {
03500 if (!timeout) {
03501 rb_thread_sleep_forever();
03502 return 0;
03503 }
03504 rb_thread_wait_for(*timeout);
03505 return 0;
03506 }
03507
03508 if (read) {
03509 rb_fd_resize(max - 1, read);
03510 }
03511 if (write) {
03512 rb_fd_resize(max - 1, write);
03513 }
03514 if (except) {
03515 rb_fd_resize(max - 1, except);
03516 }
03517 return do_select(max, read, write, except, timeout);
03518 }
03519
03520
03521
03522
03523
03524
03525 #if defined(HAVE_POLL) && defined(__linux__)
03526 # define USE_POLL
03527 #endif
03528
03529 #ifdef USE_POLL
03530
03531
03532 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
03533 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
03534 #define POLLEX_SET (POLLPRI)
03535
03536 #ifndef HAVE_PPOLL
03537
03538 int
03539 ppoll(struct pollfd *fds, nfds_t nfds,
03540 const struct timespec *ts, const sigset_t *sigmask)
03541 {
03542 int timeout_ms;
03543
03544 if (ts) {
03545 int tmp, tmp2;
03546
03547 if (ts->tv_sec > TIMET_MAX/1000)
03548 timeout_ms = -1;
03549 else {
03550 tmp = ts->tv_sec * 1000;
03551 tmp2 = ts->tv_nsec / (1000 * 1000);
03552 if (TIMET_MAX - tmp < tmp2)
03553 timeout_ms = -1;
03554 else
03555 timeout_ms = tmp + tmp2;
03556 }
03557 }
03558 else
03559 timeout_ms = -1;
03560
03561 return poll(fds, nfds, timeout_ms);
03562 }
03563 #endif
03564
03565
03566
03567
03568 int
03569 rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
03570 {
03571 struct pollfd fds;
03572 int result = 0, lerrno;
03573 double limit = 0;
03574 struct timespec ts;
03575 struct timespec *timeout = NULL;
03576 rb_thread_t *th = GET_THREAD();
03577
03578 if (tv) {
03579 ts.tv_sec = tv->tv_sec;
03580 ts.tv_nsec = tv->tv_usec * 1000;
03581 limit = timeofday();
03582 limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
03583 timeout = &ts;
03584 }
03585
03586 fds.fd = fd;
03587 fds.events = (short)events;
03588
03589 retry:
03590 lerrno = 0;
03591 BLOCKING_REGION({
03592 result = ppoll(&fds, 1, timeout, NULL);
03593 if (result < 0) lerrno = errno;
03594 }, ubf_select, th, FALSE);
03595
03596 RUBY_VM_CHECK_INTS_BLOCKING(th);
03597
03598 if (result < 0) {
03599 errno = lerrno;
03600 switch (errno) {
03601 case EINTR:
03602 #ifdef ERESTART
03603 case ERESTART:
03604 #endif
03605 if (timeout) {
03606 double d = limit - timeofday();
03607
03608 ts.tv_sec = (long)d;
03609 ts.tv_nsec = (long)((d - (double)ts.tv_sec) * 1e9);
03610 if (ts.tv_sec < 0)
03611 ts.tv_sec = 0;
03612 if (ts.tv_nsec < 0)
03613 ts.tv_nsec = 0;
03614 }
03615 goto retry;
03616 }
03617 return -1;
03618 }
03619
03620 if (fds.revents & POLLNVAL) {
03621 errno = EBADF;
03622 return -1;
03623 }
03624
03625
03626
03627
03628
03629 result = 0;
03630 if (fds.revents & POLLIN_SET)
03631 result |= RB_WAITFD_IN;
03632 if (fds.revents & POLLOUT_SET)
03633 result |= RB_WAITFD_OUT;
03634 if (fds.revents & POLLEX_SET)
03635 result |= RB_WAITFD_PRI;
03636
03637 return result;
03638 }
03639 #else
03640 static rb_fdset_t *
03641 init_set_fd(int fd, rb_fdset_t *fds)
03642 {
03643 rb_fd_init(fds);
03644 rb_fd_set(fd, fds);
03645
03646 return fds;
03647 }
03648
03649 struct select_args {
03650 union {
03651 int fd;
03652 int error;
03653 } as;
03654 rb_fdset_t *read;
03655 rb_fdset_t *write;
03656 rb_fdset_t *except;
03657 struct timeval *tv;
03658 };
03659
03660 static VALUE
03661 select_single(VALUE ptr)
03662 {
03663 struct select_args *args = (struct select_args *)ptr;
03664 int r;
03665
03666 r = rb_thread_fd_select(args->as.fd + 1,
03667 args->read, args->write, args->except, args->tv);
03668 if (r == -1)
03669 args->as.error = errno;
03670 if (r > 0) {
03671 r = 0;
03672 if (args->read && rb_fd_isset(args->as.fd, args->read))
03673 r |= RB_WAITFD_IN;
03674 if (args->write && rb_fd_isset(args->as.fd, args->write))
03675 r |= RB_WAITFD_OUT;
03676 if (args->except && rb_fd_isset(args->as.fd, args->except))
03677 r |= RB_WAITFD_PRI;
03678 }
03679 return (VALUE)r;
03680 }
03681
03682 static VALUE
03683 select_single_cleanup(VALUE ptr)
03684 {
03685 struct select_args *args = (struct select_args *)ptr;
03686
03687 if (args->read) rb_fd_term(args->read);
03688 if (args->write) rb_fd_term(args->write);
03689 if (args->except) rb_fd_term(args->except);
03690
03691 return (VALUE)-1;
03692 }
03693
03694 int
03695 rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
03696 {
03697 rb_fdset_t rfds, wfds, efds;
03698 struct select_args args;
03699 int r;
03700 VALUE ptr = (VALUE)&args;
03701
03702 args.as.fd = fd;
03703 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
03704 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
03705 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
03706 args.tv = tv;
03707
03708 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
03709 if (r == -1)
03710 errno = args.as.error;
03711
03712 return r;
03713 }
03714 #endif
03715
03716
03717
03718
03719
03720 #ifdef USE_CONSERVATIVE_STACK_END
03721 void
03722 rb_gc_set_stack_end(VALUE **stack_end_p)
03723 {
03724 VALUE stack_end;
03725 *stack_end_p = &stack_end;
03726 }
03727 #endif
03728
03729
03730
03731
03732
03733
03734 void
03735 rb_threadptr_check_signal(rb_thread_t *mth)
03736 {
03737
03738 if (rb_signal_buff_size() > 0) {
03739
03740 rb_threadptr_trap_interrupt(mth);
03741 }
03742 }
03743
03744 static void
03745 timer_thread_function(void *arg)
03746 {
03747 rb_vm_t *vm = GET_VM();
03748
03749
03750
03751
03752
03753
03754 native_mutex_lock(&vm->thread_destruct_lock);
03755
03756 if (vm->running_thread)
03757 RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread);
03758 native_mutex_unlock(&vm->thread_destruct_lock);
03759
03760
03761 rb_threadptr_check_signal(vm->main_thread);
03762
03763 #if 0
03764
03765 if (vm->prove_profile.enable) {
03766 rb_thread_t *th = vm->running_thread;
03767
03768 if (vm->during_gc) {
03769
03770 }
03771 }
03772 #endif
03773 }
03774
03775 void
03776 rb_thread_stop_timer_thread(int close_anyway)
03777 {
03778 if (timer_thread_id && native_stop_timer_thread(close_anyway)) {
03779 native_reset_timer_thread();
03780 }
03781 }
03782
03783 void
03784 rb_thread_reset_timer_thread(void)
03785 {
03786 native_reset_timer_thread();
03787 }
03788
03789 void
03790 rb_thread_start_timer_thread(void)
03791 {
03792 system_working = 1;
03793 rb_thread_create_timer_thread();
03794 }
03795
03796 static int
03797 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
03798 {
03799 int i;
03800 VALUE lines = (VALUE)val;
03801
03802 for (i = 0; i < RARRAY_LEN(lines); i++) {
03803 if (RARRAY_PTR(lines)[i] != Qnil) {
03804 RARRAY_PTR(lines)[i] = INT2FIX(0);
03805 }
03806 }
03807 return ST_CONTINUE;
03808 }
03809
03810 static void
03811 clear_coverage(void)
03812 {
03813 VALUE coverages = rb_get_coverages();
03814 if (RTEST(coverages)) {
03815 st_foreach(RHASH_TBL(coverages), clear_coverage_i, 0);
03816 }
03817 }
03818
03819 static void
03820 rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t))
03821 {
03822 rb_thread_t *th = GET_THREAD();
03823 rb_vm_t *vm = th->vm;
03824 VALUE thval = th->self;
03825 vm->main_thread = th;
03826
03827 gvl_atfork(th->vm);
03828 st_foreach(vm->living_threads, atfork, (st_data_t)th);
03829 st_clear(vm->living_threads);
03830 st_insert(vm->living_threads, thval, (st_data_t)th->thread_id);
03831 vm->sleeper = 0;
03832 clear_coverage();
03833 }
03834
03835 static int
03836 terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th)
03837 {
03838 VALUE thval = key;
03839 rb_thread_t *th;
03840 GetThreadPtr(thval, th);
03841
03842 if (th != (rb_thread_t *)current_th) {
03843 rb_mutex_abandon_keeping_mutexes(th);
03844 rb_mutex_abandon_locking_mutex(th);
03845 thread_cleanup_func(th, TRUE);
03846 }
03847 return ST_CONTINUE;
03848 }
03849
03850 void
03851 rb_thread_atfork(void)
03852 {
03853 rb_thread_atfork_internal(terminate_atfork_i);
03854 GET_THREAD()->join_list = NULL;
03855
03856
03857 rb_reset_random_seed();
03858 }
03859
03860 static int
03861 terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th)
03862 {
03863 VALUE thval = key;
03864 rb_thread_t *th;
03865 GetThreadPtr(thval, th);
03866
03867 if (th != (rb_thread_t *)current_th) {
03868 thread_cleanup_func_before_exec(th);
03869 }
03870 return ST_CONTINUE;
03871 }
03872
03873 void
03874 rb_thread_atfork_before_exec(void)
03875 {
03876 rb_thread_atfork_internal(terminate_atfork_before_exec_i);
03877 }
03878
03879 struct thgroup {
03880 int enclosed;
03881 VALUE group;
03882 };
03883
03884 static size_t
03885 thgroup_memsize(const void *ptr)
03886 {
03887 return ptr ? sizeof(struct thgroup) : 0;
03888 }
03889
03890 static const rb_data_type_t thgroup_data_type = {
03891 "thgroup",
03892 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,},
03893 };
03894
03895
03896
03897
03898
03899
03900
03901
03902
03903
03904
03905
03906
03907
03908
03909
03910
03911
03912
03913 static VALUE
03914 thgroup_s_alloc(VALUE klass)
03915 {
03916 VALUE group;
03917 struct thgroup *data;
03918
03919 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data);
03920 data->enclosed = 0;
03921 data->group = group;
03922
03923 return group;
03924 }
03925
03926 struct thgroup_list_params {
03927 VALUE ary;
03928 VALUE group;
03929 };
03930
03931 static int
03932 thgroup_list_i(st_data_t key, st_data_t val, st_data_t data)
03933 {
03934 VALUE thread = (VALUE)key;
03935 VALUE ary = ((struct thgroup_list_params *)data)->ary;
03936 VALUE group = ((struct thgroup_list_params *)data)->group;
03937 rb_thread_t *th;
03938 GetThreadPtr(thread, th);
03939
03940 if (th->thgroup == group) {
03941 rb_ary_push(ary, thread);
03942 }
03943 return ST_CONTINUE;
03944 }
03945
03946
03947
03948
03949
03950
03951
03952
03953
03954
03955
03956 static VALUE
03957 thgroup_list(VALUE group)
03958 {
03959 VALUE ary = rb_ary_new();
03960 struct thgroup_list_params param;
03961
03962 param.ary = ary;
03963 param.group = group;
03964 st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param);
03965 return ary;
03966 }
03967
03968
03969
03970
03971
03972
03973
03974
03975
03976
03977
03978
03979
03980
03981
03982
03983
03984
03985
03986
03987 static VALUE
03988 thgroup_enclose(VALUE group)
03989 {
03990 struct thgroup *data;
03991
03992 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
03993 data->enclosed = 1;
03994
03995 return group;
03996 }
03997
03998
03999
04000
04001
04002
04003
04004
04005
04006
04007 static VALUE
04008 thgroup_enclosed_p(VALUE group)
04009 {
04010 struct thgroup *data;
04011
04012 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
04013 if (data->enclosed)
04014 return Qtrue;
04015 return Qfalse;
04016 }
04017
04018
04019
04020
04021
04022
04023
04024
04025
04026
04027
04028
04029
04030
04031
04032
04033
04034
04035
04036
04037
04038
04039
04040
04041
04042
04043
04044
04045 static VALUE
04046 thgroup_add(VALUE group, VALUE thread)
04047 {
04048 rb_thread_t *th;
04049 struct thgroup *data;
04050
04051 rb_secure(4);
04052 GetThreadPtr(thread, th);
04053
04054 if (OBJ_FROZEN(group)) {
04055 rb_raise(rb_eThreadError, "can't move to the frozen thread group");
04056 }
04057 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
04058 if (data->enclosed) {
04059 rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
04060 }
04061
04062 if (!th->thgroup) {
04063 return Qnil;
04064 }
04065
04066 if (OBJ_FROZEN(th->thgroup)) {
04067 rb_raise(rb_eThreadError, "can't move from the frozen thread group");
04068 }
04069 TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data);
04070 if (data->enclosed) {
04071 rb_raise(rb_eThreadError,
04072 "can't move from the enclosed thread group");
04073 }
04074
04075 th->thgroup = group;
04076 return group;
04077 }
04078
04079
04080
04081
04082
04083
04084
04085
04086
04087
04088
04089
04090
04091
04092
04093
04094
04095
04096
04097
04098
04099
04100
04101
04102
04103
04104
04105 #define GetMutexPtr(obj, tobj) \
04106 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj))
04107
04108 #define mutex_mark NULL
04109
04110 static void
04111 mutex_free(void *ptr)
04112 {
04113 if (ptr) {
04114 rb_mutex_t *mutex = ptr;
04115 if (mutex->th) {
04116
04117 const char *err = rb_mutex_unlock_th(mutex, mutex->th);
04118 if (err) rb_bug("%s", err);
04119 }
04120 native_mutex_destroy(&mutex->lock);
04121 native_cond_destroy(&mutex->cond);
04122 }
04123 ruby_xfree(ptr);
04124 }
04125
04126 static size_t
04127 mutex_memsize(const void *ptr)
04128 {
04129 return ptr ? sizeof(rb_mutex_t) : 0;
04130 }
04131
04132 static const rb_data_type_t mutex_data_type = {
04133 "mutex",
04134 {mutex_mark, mutex_free, mutex_memsize,},
04135 };
04136
04137 VALUE
04138 rb_obj_is_mutex(VALUE obj)
04139 {
04140 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) {
04141 return Qtrue;
04142 }
04143 else {
04144 return Qfalse;
04145 }
04146 }
04147
04148 static VALUE
04149 mutex_alloc(VALUE klass)
04150 {
04151 VALUE volatile obj;
04152 rb_mutex_t *mutex;
04153
04154 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
04155 native_mutex_initialize(&mutex->lock);
04156 native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC);
04157 return obj;
04158 }
04159
04160
04161
04162
04163
04164
04165
04166 static VALUE
04167 mutex_initialize(VALUE self)
04168 {
04169 return self;
04170 }
04171
04172 VALUE
04173 rb_mutex_new(void)
04174 {
04175 return mutex_alloc(rb_cMutex);
04176 }
04177
04178
04179
04180
04181
04182
04183
04184 VALUE
04185 rb_mutex_locked_p(VALUE self)
04186 {
04187 rb_mutex_t *mutex;
04188 GetMutexPtr(self, mutex);
04189 return mutex->th ? Qtrue : Qfalse;
04190 }
04191
04192 static void
04193 mutex_locked(rb_thread_t *th, VALUE self)
04194 {
04195 rb_mutex_t *mutex;
04196 GetMutexPtr(self, mutex);
04197
04198 if (th->keeping_mutexes) {
04199 mutex->next_mutex = th->keeping_mutexes;
04200 }
04201 th->keeping_mutexes = mutex;
04202 }
04203
04204
04205
04206
04207
04208
04209
04210
04211 VALUE
04212 rb_mutex_trylock(VALUE self)
04213 {
04214 rb_mutex_t *mutex;
04215 VALUE locked = Qfalse;
04216 GetMutexPtr(self, mutex);
04217
04218 native_mutex_lock(&mutex->lock);
04219 if (mutex->th == 0) {
04220 mutex->th = GET_THREAD();
04221 locked = Qtrue;
04222
04223 mutex_locked(GET_THREAD(), self);
04224 }
04225 native_mutex_unlock(&mutex->lock);
04226
04227 return locked;
04228 }
04229
04230 static int
04231 lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms)
04232 {
04233 int interrupted = 0;
04234 int err = 0;
04235
04236 mutex->cond_waiting++;
04237 for (;;) {
04238 if (!mutex->th) {
04239 mutex->th = th;
04240 break;
04241 }
04242 if (RUBY_VM_INTERRUPTED(th)) {
04243 interrupted = 1;
04244 break;
04245 }
04246 if (err == ETIMEDOUT) {
04247 interrupted = 2;
04248 break;
04249 }
04250
04251 if (timeout_ms) {
04252 struct timespec timeout_rel;
04253 struct timespec timeout;
04254
04255 timeout_rel.tv_sec = 0;
04256 timeout_rel.tv_nsec = timeout_ms * 1000 * 1000;
04257 timeout = native_cond_timeout(&mutex->cond, timeout_rel);
04258 err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout);
04259 }
04260 else {
04261 native_cond_wait(&mutex->cond, &mutex->lock);
04262 err = 0;
04263 }
04264 }
04265 mutex->cond_waiting--;
04266
04267 return interrupted;
04268 }
04269
04270 static void
04271 lock_interrupt(void *ptr)
04272 {
04273 rb_mutex_t *mutex = (rb_mutex_t *)ptr;
04274 native_mutex_lock(&mutex->lock);
04275 if (mutex->cond_waiting > 0)
04276 native_cond_broadcast(&mutex->cond);
04277 native_mutex_unlock(&mutex->lock);
04278 }
04279
04280
04281
04282
04283
04284
04285 static const rb_thread_t *patrol_thread = NULL;
04286
04287
04288
04289
04290
04291
04292
04293
04294 VALUE
04295 rb_mutex_lock(VALUE self)
04296 {
04297 rb_thread_t *th = GET_THREAD();
04298 rb_mutex_t *mutex;
04299 GetMutexPtr(self, mutex);
04300
04301
04302 if (!mutex->allow_trap && th->interrupt_mask & TRAP_INTERRUPT_MASK) {
04303 rb_raise(rb_eThreadError, "can't be called from trap context");
04304 }
04305
04306 if (rb_mutex_trylock(self) == Qfalse) {
04307 if (mutex->th == GET_THREAD()) {
04308 rb_raise(rb_eThreadError, "deadlock; recursive locking");
04309 }
04310
04311 while (mutex->th != th) {
04312 int interrupted;
04313 enum rb_thread_status prev_status = th->status;
04314 volatile int timeout_ms = 0;
04315 struct rb_unblock_callback oldubf;
04316
04317 set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE);
04318 th->status = THREAD_STOPPED_FOREVER;
04319 th->locking_mutex = self;
04320
04321 native_mutex_lock(&mutex->lock);
04322 th->vm->sleeper++;
04323
04324
04325
04326
04327
04328 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
04329 !patrol_thread) {
04330 timeout_ms = 100;
04331 patrol_thread = th;
04332 }
04333
04334 GVL_UNLOCK_BEGIN();
04335 interrupted = lock_func(th, mutex, (int)timeout_ms);
04336 native_mutex_unlock(&mutex->lock);
04337 GVL_UNLOCK_END();
04338
04339 if (patrol_thread == th)
04340 patrol_thread = NULL;
04341
04342 reset_unblock_function(th, &oldubf);
04343
04344 th->locking_mutex = Qfalse;
04345 if (mutex->th && interrupted == 2) {
04346 rb_check_deadlock(th->vm);
04347 }
04348 if (th->status == THREAD_STOPPED_FOREVER) {
04349 th->status = prev_status;
04350 }
04351 th->vm->sleeper--;
04352
04353 if (mutex->th == th) mutex_locked(th, self);
04354
04355 if (interrupted) {
04356 RUBY_VM_CHECK_INTS_BLOCKING(th);
04357 }
04358 }
04359 }
04360 return self;
04361 }
04362
04363
04364
04365
04366
04367
04368
04369
04370 VALUE
04371 rb_mutex_owned_p(VALUE self)
04372 {
04373 VALUE owned = Qfalse;
04374 rb_thread_t *th = GET_THREAD();
04375 rb_mutex_t *mutex;
04376
04377 GetMutexPtr(self, mutex);
04378
04379 if (mutex->th == th)
04380 owned = Qtrue;
04381
04382 return owned;
04383 }
04384
04385 static const char *
04386 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th)
04387 {
04388 const char *err = NULL;
04389
04390 native_mutex_lock(&mutex->lock);
04391
04392 if (mutex->th == 0) {
04393 err = "Attempt to unlock a mutex which is not locked";
04394 }
04395 else if (mutex->th != th) {
04396 err = "Attempt to unlock a mutex which is locked by another thread";
04397 }
04398 else {
04399 mutex->th = 0;
04400 if (mutex->cond_waiting > 0)
04401 native_cond_signal(&mutex->cond);
04402 }
04403
04404 native_mutex_unlock(&mutex->lock);
04405
04406 if (!err) {
04407 rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes;
04408 while (*th_mutex != mutex) {
04409 th_mutex = &(*th_mutex)->next_mutex;
04410 }
04411 *th_mutex = mutex->next_mutex;
04412 mutex->next_mutex = NULL;
04413 }
04414
04415 return err;
04416 }
04417
04418
04419
04420
04421
04422
04423
04424
04425 VALUE
04426 rb_mutex_unlock(VALUE self)
04427 {
04428 const char *err;
04429 rb_mutex_t *mutex;
04430 GetMutexPtr(self, mutex);
04431
04432 err = rb_mutex_unlock_th(mutex, GET_THREAD());
04433 if (err) rb_raise(rb_eThreadError, "%s", err);
04434
04435 return self;
04436 }
04437
04438 static void
04439 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
04440 {
04441 if (th->keeping_mutexes) {
04442 rb_mutex_abandon_all(th->keeping_mutexes);
04443 }
04444 th->keeping_mutexes = NULL;
04445 }
04446
04447 static void
04448 rb_mutex_abandon_locking_mutex(rb_thread_t *th)
04449 {
04450 rb_mutex_t *mutex;
04451
04452 if (!th->locking_mutex) return;
04453
04454 GetMutexPtr(th->locking_mutex, mutex);
04455 if (mutex->th == th)
04456 rb_mutex_abandon_all(mutex);
04457 th->locking_mutex = Qfalse;
04458 }
04459
04460 static void
04461 rb_mutex_abandon_all(rb_mutex_t *mutexes)
04462 {
04463 rb_mutex_t *mutex;
04464
04465 while (mutexes) {
04466 mutex = mutexes;
04467 mutexes = mutex->next_mutex;
04468 mutex->th = 0;
04469 mutex->next_mutex = 0;
04470 }
04471 }
04472
04473 static VALUE
04474 rb_mutex_sleep_forever(VALUE time)
04475 {
04476 sleep_forever(GET_THREAD(), 1, 0);
04477 return Qnil;
04478 }
04479
04480 static VALUE
04481 rb_mutex_wait_for(VALUE time)
04482 {
04483 struct timeval *t = (struct timeval *)time;
04484 sleep_timeval(GET_THREAD(), *t, 0);
04485 return Qnil;
04486 }
04487
04488 VALUE
04489 rb_mutex_sleep(VALUE self, VALUE timeout)
04490 {
04491 time_t beg, end;
04492 struct timeval t;
04493
04494 if (!NIL_P(timeout)) {
04495 t = rb_time_interval(timeout);
04496 }
04497 rb_mutex_unlock(self);
04498 beg = time(0);
04499 if (NIL_P(timeout)) {
04500 rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self);
04501 }
04502 else {
04503 rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self);
04504 }
04505 end = time(0) - beg;
04506 return INT2FIX(end);
04507 }
04508
04509
04510
04511
04512
04513
04514
04515
04516
04517
04518
04519
04520 static VALUE
04521 mutex_sleep(int argc, VALUE *argv, VALUE self)
04522 {
04523 VALUE timeout;
04524
04525 rb_scan_args(argc, argv, "01", &timeout);
04526 return rb_mutex_sleep(self, timeout);
04527 }
04528
04529
04530
04531
04532
04533
04534
04535
04536
04537 VALUE
04538 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
04539 {
04540 rb_mutex_lock(mutex);
04541 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
04542 }
04543
04544
04545
04546
04547
04548
04549
04550
04551 static VALUE
04552 rb_mutex_synchronize_m(VALUE self, VALUE args)
04553 {
04554 if (!rb_block_given_p()) {
04555 rb_raise(rb_eThreadError, "must be called with a block");
04556 }
04557
04558 return rb_mutex_synchronize(self, rb_yield, Qundef);
04559 }
04560
04561 void rb_mutex_allow_trap(VALUE self, int val)
04562 {
04563 rb_mutex_t *m;
04564 GetMutexPtr(self, m);
04565
04566 m->allow_trap = val;
04567 }
04568
04569
04570
04571
04572 static void
04573 thread_shield_mark(void *ptr)
04574 {
04575 rb_gc_mark((VALUE)ptr);
04576 }
04577
04578 static const rb_data_type_t thread_shield_data_type = {
04579 "thread_shield",
04580 {thread_shield_mark, 0, 0,},
04581 };
04582
04583 static VALUE
04584 thread_shield_alloc(VALUE klass)
04585 {
04586 return TypedData_Wrap_Struct(klass, &thread_shield_data_type, (void *)mutex_alloc(0));
04587 }
04588
04589 #define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type))
04590 #define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19)
04591 #define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT)
04592 #define rb_thread_shield_waiting(b) (int)((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT)
04593
04594 static inline void
04595 rb_thread_shield_waiting_inc(VALUE b)
04596 {
04597 unsigned int w = rb_thread_shield_waiting(b);
04598 w++;
04599 if (w > (unsigned int)(THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT))
04600 rb_raise(rb_eRuntimeError, "waiting count overflow");
04601 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
04602 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
04603 }
04604
04605 static inline void
04606 rb_thread_shield_waiting_dec(VALUE b)
04607 {
04608 unsigned int w = rb_thread_shield_waiting(b);
04609 if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow");
04610 w--;
04611 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
04612 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
04613 }
04614
04615 VALUE
04616 rb_thread_shield_new(void)
04617 {
04618 VALUE thread_shield = thread_shield_alloc(rb_cThreadShield);
04619 rb_mutex_lock((VALUE)DATA_PTR(thread_shield));
04620 return thread_shield;
04621 }
04622
04623
04624
04625
04626
04627
04628
04629
04630
04631 VALUE
04632 rb_thread_shield_wait(VALUE self)
04633 {
04634 VALUE mutex = GetThreadShieldPtr(self);
04635 rb_mutex_t *m;
04636
04637 if (!mutex) return Qfalse;
04638 GetMutexPtr(mutex, m);
04639 if (m->th == GET_THREAD()) return Qnil;
04640 rb_thread_shield_waiting_inc(self);
04641 rb_mutex_lock(mutex);
04642 rb_thread_shield_waiting_dec(self);
04643 if (DATA_PTR(self)) return Qtrue;
04644 rb_mutex_unlock(mutex);
04645 return rb_thread_shield_waiting(self) > 0 ? Qnil : Qfalse;
04646 }
04647
04648
04649
04650
04651 VALUE
04652 rb_thread_shield_release(VALUE self)
04653 {
04654 VALUE mutex = GetThreadShieldPtr(self);
04655 rb_mutex_unlock(mutex);
04656 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
04657 }
04658
04659
04660
04661
04662 VALUE
04663 rb_thread_shield_destroy(VALUE self)
04664 {
04665 VALUE mutex = GetThreadShieldPtr(self);
04666 DATA_PTR(self) = 0;
04667 rb_mutex_unlock(mutex);
04668 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
04669 }
04670
04671
04672 static ID recursive_key;
04673
04674
04675
04676
04677
04678
04679
04680 static VALUE
04681 recursive_list_access(VALUE sym)
04682 {
04683 volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key);
04684 VALUE list;
04685 if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) {
04686 hash = rb_hash_new();
04687 OBJ_UNTRUST(hash);
04688 rb_thread_local_aset(rb_thread_current(), recursive_key, hash);
04689 list = Qnil;
04690 }
04691 else {
04692 list = rb_hash_aref(hash, sym);
04693 }
04694 if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) {
04695 list = rb_hash_new();
04696 OBJ_UNTRUST(list);
04697 rb_hash_aset(hash, sym, list);
04698 }
04699 return list;
04700 }
04701
04702
04703
04704
04705
04706
04707
04708 static VALUE
04709 recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id)
04710 {
04711 #if SIZEOF_LONG == SIZEOF_VOIDP
04712 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other))
04713 #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP
04714 #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \
04715 rb_big_eql((obj_id), (other)) : ((obj_id) == (other)))
04716 #endif
04717
04718 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef);
04719 if (pair_list == Qundef)
04720 return Qfalse;
04721 if (paired_obj_id) {
04722 if (!RB_TYPE_P(pair_list, T_HASH)) {
04723 if (!OBJ_ID_EQL(paired_obj_id, pair_list))
04724 return Qfalse;
04725 }
04726 else {
04727 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
04728 return Qfalse;
04729 }
04730 }
04731 return Qtrue;
04732 }
04733
04734
04735
04736
04737
04738
04739
04740
04741
04742
04743 static void
04744 recursive_push(VALUE list, VALUE obj, VALUE paired_obj)
04745 {
04746 VALUE pair_list;
04747
04748 if (!paired_obj) {
04749 rb_hash_aset(list, obj, Qtrue);
04750 }
04751 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) {
04752 rb_hash_aset(list, obj, paired_obj);
04753 }
04754 else {
04755 if (!RB_TYPE_P(pair_list, T_HASH)){
04756 VALUE other_paired_obj = pair_list;
04757 pair_list = rb_hash_new();
04758 OBJ_UNTRUST(pair_list);
04759 rb_hash_aset(pair_list, other_paired_obj, Qtrue);
04760 rb_hash_aset(list, obj, pair_list);
04761 }
04762 rb_hash_aset(pair_list, paired_obj, Qtrue);
04763 }
04764 }
04765
04766
04767
04768
04769
04770
04771
04772
04773
04774 static int
04775 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj)
04776 {
04777 if (paired_obj) {
04778 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
04779 if (pair_list == Qundef) {
04780 return 0;
04781 }
04782 if (RB_TYPE_P(pair_list, T_HASH)) {
04783 rb_hash_delete(pair_list, paired_obj);
04784 if (!RHASH_EMPTY_P(pair_list)) {
04785 return 1;
04786 }
04787 }
04788 }
04789 rb_hash_delete(list, obj);
04790 return 1;
04791 }
04792
04793 struct exec_recursive_params {
04794 VALUE (*func) (VALUE, VALUE, int);
04795 VALUE list;
04796 VALUE obj;
04797 VALUE objid;
04798 VALUE pairid;
04799 VALUE arg;
04800 };
04801
04802 static VALUE
04803 exec_recursive_i(VALUE tag, struct exec_recursive_params *p)
04804 {
04805 VALUE result = Qundef;
04806 int state;
04807
04808 recursive_push(p->list, p->objid, p->pairid);
04809 PUSH_TAG();
04810 if ((state = EXEC_TAG()) == 0) {
04811 result = (*p->func)(p->obj, p->arg, FALSE);
04812 }
04813 POP_TAG();
04814 recursive_pop(p->list, p->objid, p->pairid);
04815 if (state)
04816 JUMP_TAG(state);
04817 return result;
04818 }
04819
04820
04821
04822
04823
04824
04825
04826
04827
04828
04829
04830
04831 static VALUE
04832 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer)
04833 {
04834 VALUE result = Qundef;
04835 const ID mid = rb_frame_last_func();
04836 const VALUE sym = mid ? ID2SYM(mid) : ID2SYM(idNULL);
04837 struct exec_recursive_params p;
04838 int outermost;
04839 p.list = recursive_list_access(sym);
04840 p.objid = rb_obj_id(obj);
04841 p.obj = obj;
04842 p.pairid = pairid;
04843 p.arg = arg;
04844 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0);
04845
04846 if (recursive_check(p.list, p.objid, pairid)) {
04847 if (outer && !outermost) {
04848 rb_throw_obj(p.list, p.list);
04849 }
04850 return (*func)(obj, arg, TRUE);
04851 }
04852 else {
04853 p.func = func;
04854
04855 if (outermost) {
04856 recursive_push(p.list, ID2SYM(recursive_key), 0);
04857 result = rb_catch_obj(p.list, exec_recursive_i, (VALUE)&p);
04858 if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) {
04859 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list "
04860 "for %+"PRIsVALUE" in %+"PRIsVALUE,
04861 sym, rb_thread_current());
04862 }
04863 if (result == p.list) {
04864 result = (*func)(obj, arg, TRUE);
04865 }
04866 }
04867 else {
04868 result = exec_recursive_i(0, &p);
04869 }
04870 }
04871 *(volatile struct exec_recursive_params *)&p;
04872 return result;
04873 }
04874
04875
04876
04877
04878
04879
04880 VALUE
04881 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
04882 {
04883 return exec_recursive(func, obj, 0, arg, 0);
04884 }
04885
04886
04887
04888
04889
04890
04891 VALUE
04892 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
04893 {
04894 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0);
04895 }
04896
04897
04898
04899
04900
04901
04902
04903 VALUE
04904 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
04905 {
04906 return exec_recursive(func, obj, 0, arg, 1);
04907 }
04908
04909
04910
04911
04912
04913
04914
04915 VALUE
04916 rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
04917 {
04918 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 1);
04919 }
04920
04921
04922
04923
04924
04925
04926
04927
04928
04929 static VALUE
04930 rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval)
04931 {
04932 return vm_thread_backtrace(argc, argv, thval);
04933 }
04934
04935
04936
04937
04938
04939
04940
04941
04942
04943
04944
04945
04946 static VALUE
04947 rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
04948 {
04949 return vm_thread_backtrace_locations(argc, argv, thval);
04950 }
04951
04952
04953
04954
04955
04956
04957
04958
04959
04960
04961
04962
04963
04964
04965
04966
04967
04968
04969
04970
04971
04972
04973
04974
04975 void
04976 Init_Thread(void)
04977 {
04978 #undef rb_intern
04979 #define rb_intern(str) rb_intern_const(str)
04980
04981 VALUE cThGroup;
04982 rb_thread_t *th = GET_THREAD();
04983
04984 sym_never = ID2SYM(rb_intern("never"));
04985 sym_immediate = ID2SYM(rb_intern("immediate"));
04986 sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
04987
04988 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
04989 rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
04990 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
04991 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0);
04992 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0);
04993 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0);
04994 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1);
04995 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0);
04996 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0);
04997 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
04998 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
04999 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
05000 #if THREAD_DEBUG < 0
05001 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
05002 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
05003 #endif
05004 rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1);
05005 rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1);
05006 rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1);
05007
05008 rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
05009 rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
05010 rb_define_method(rb_cThread, "join", thread_join_m, -1);
05011 rb_define_method(rb_cThread, "value", thread_value, 0);
05012 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0);
05013 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0);
05014 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0);
05015 rb_define_method(rb_cThread, "run", rb_thread_run, 0);
05016 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0);
05017 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1);
05018 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2);
05019 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1);
05020 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
05021 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
05022 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
05023 rb_define_method(rb_cThread, "status", rb_thread_status, 0);
05024 rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
05025 rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
05026 rb_define_method(rb_cThread, "thread_variables", rb_thread_variables, 0);
05027 rb_define_method(rb_cThread, "thread_variable?", rb_thread_variable_p, 1);
05028 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0);
05029 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0);
05030 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0);
05031 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1);
05032 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0);
05033 rb_define_method(rb_cThread, "group", rb_thread_group, 0);
05034 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
05035 rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
05036
05037 rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0);
05038
05039 closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed");
05040 OBJ_TAINT(closed_stream_error);
05041 OBJ_FREEZE(closed_stream_error);
05042
05043 cThGroup = rb_define_class("ThreadGroup", rb_cObject);
05044 rb_define_alloc_func(cThGroup, thgroup_s_alloc);
05045 rb_define_method(cThGroup, "list", thgroup_list, 0);
05046 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0);
05047 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0);
05048 rb_define_method(cThGroup, "add", thgroup_add, 1);
05049
05050 {
05051 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup);
05052 rb_define_const(cThGroup, "Default", th->thgroup);
05053 }
05054
05055 rb_cMutex = rb_define_class("Mutex", rb_cObject);
05056 rb_define_alloc_func(rb_cMutex, mutex_alloc);
05057 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
05058 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
05059 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
05060 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
05061 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
05062 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
05063 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
05064 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
05065
05066 recursive_key = rb_intern("__recursive_key__");
05067 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
05068
05069
05070 {
05071
05072 {
05073
05074 gvl_init(th->vm);
05075 gvl_acquire(th->vm, th);
05076 native_mutex_initialize(&th->vm->thread_destruct_lock);
05077 native_mutex_initialize(&th->interrupt_lock);
05078
05079 th->pending_interrupt_queue = rb_ary_tmp_new(0);
05080 th->pending_interrupt_queue_checked = 0;
05081 th->pending_interrupt_mask_stack = rb_ary_tmp_new(0);
05082
05083 th->interrupt_mask = 0;
05084 }
05085 }
05086
05087 rb_thread_create_timer_thread();
05088
05089
05090 (void)native_mutex_trylock;
05091 }
05092
05093 int
05094 ruby_native_thread_p(void)
05095 {
05096 rb_thread_t *th = ruby_thread_from_native();
05097
05098 return th != 0;
05099 }
05100
05101 static int
05102 check_deadlock_i(st_data_t key, st_data_t val, int *found)
05103 {
05104 VALUE thval = key;
05105 rb_thread_t *th;
05106 GetThreadPtr(thval, th);
05107
05108 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) {
05109 *found = 1;
05110 }
05111 else if (th->locking_mutex) {
05112 rb_mutex_t *mutex;
05113 GetMutexPtr(th->locking_mutex, mutex);
05114
05115 native_mutex_lock(&mutex->lock);
05116 if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) {
05117 *found = 1;
05118 }
05119 native_mutex_unlock(&mutex->lock);
05120 }
05121
05122 return (*found) ? ST_STOP : ST_CONTINUE;
05123 }
05124
05125 #ifdef DEBUG_DEADLOCK_CHECK
05126 static int
05127 debug_i(st_data_t key, st_data_t val, int *found)
05128 {
05129 VALUE thval = key;
05130 rb_thread_t *th;
05131 GetThreadPtr(thval, th);
05132
05133 printf("th:%p %d %d", th, th->status, th->interrupt_flag);
05134 if (th->locking_mutex) {
05135 rb_mutex_t *mutex;
05136 GetMutexPtr(th->locking_mutex, mutex);
05137
05138 native_mutex_lock(&mutex->lock);
05139 printf(" %p %d\n", mutex->th, mutex->cond_waiting);
05140 native_mutex_unlock(&mutex->lock);
05141 }
05142 else
05143 puts("");
05144
05145 return ST_CONTINUE;
05146 }
05147 #endif
05148
05149 static void
05150 rb_check_deadlock(rb_vm_t *vm)
05151 {
05152 int found = 0;
05153
05154 if (vm_living_thread_num(vm) > vm->sleeper) return;
05155 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
05156 if (patrol_thread && patrol_thread != GET_THREAD()) return;
05157
05158 st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found);
05159
05160 if (!found) {
05161 VALUE argv[2];
05162 argv[0] = rb_eFatal;
05163 argv[1] = rb_str_new2("No live threads left. Deadlock?");
05164 #ifdef DEBUG_DEADLOCK_CHECK
05165 printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread);
05166 st_foreach(vm->living_threads, debug_i, (st_data_t)0);
05167 #endif
05168 vm->sleeper--;
05169 rb_threadptr_raise(vm->main_thread, 2, argv);
05170 }
05171 }
05172
05173 static void
05174 update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass)
05175 {
05176 VALUE coverage = GET_THREAD()->cfp->iseq->coverage;
05177 if (coverage && RBASIC(coverage)->klass == 0) {
05178 long line = rb_sourceline() - 1;
05179 long count;
05180 if (RARRAY_PTR(coverage)[line] == Qnil) {
05181 return;
05182 }
05183 count = FIX2LONG(RARRAY_PTR(coverage)[line]) + 1;
05184 if (POSFIXABLE(count)) {
05185 RARRAY_PTR(coverage)[line] = LONG2FIX(count);
05186 }
05187 }
05188 }
05189
05190 VALUE
05191 rb_get_coverages(void)
05192 {
05193 return GET_VM()->coverages;
05194 }
05195
05196 void
05197 rb_set_coverages(VALUE coverages)
05198 {
05199 GET_VM()->coverages = coverages;
05200 rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil);
05201 }
05202
05203 void
05204 rb_reset_coverages(void)
05205 {
05206 GET_VM()->coverages = Qfalse;
05207 rb_remove_event_hook(update_coverage);
05208 }
05209
05210 VALUE
05211 rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
05212 {
05213 VALUE interrupt_mask = rb_hash_new();
05214 rb_thread_t *cur_th = GET_THREAD();
05215
05216 rb_hash_aset(interrupt_mask, rb_cObject, sym_never);
05217 rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
05218
05219 return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack);
05220 }
05221