00001 #include <ruby.h>
00002
00003 enum {
00004 CONDVAR_WAITERS = 0
00005 };
00006
00007 enum {
00008 QUEUE_QUE = 0,
00009 QUEUE_WAITERS = 1,
00010 SZQUEUE_WAITERS = 2,
00011 SZQUEUE_MAX = 3
00012 };
00013
00014 #define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
00015
00016 #define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
00017 #define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
00018 #define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
00019 #define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
00020 #define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
00021
00022 static VALUE
00023 get_array(VALUE obj, int idx)
00024 {
00025 VALUE ary = RSTRUCT_GET(obj, idx);
00026 if (!RB_TYPE_P(ary, T_ARRAY)) {
00027 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
00028 }
00029 return ary;
00030 }
00031
00032 static VALUE
00033 ary_buf_new(void)
00034 {
00035 return rb_ary_tmp_new(1);
00036 }
00037
00038 static void
00039 wakeup_first_thread(VALUE list)
00040 {
00041 VALUE thread;
00042
00043 while (!NIL_P(thread = rb_ary_shift(list))) {
00044 if (RTEST(rb_thread_wakeup_alive(thread))) break;
00045 }
00046 }
00047
00048 static void
00049 wakeup_all_threads(VALUE list)
00050 {
00051 VALUE thread;
00052 long i;
00053
00054 for (i=0; i<RARRAY_LEN(list); i++) {
00055 thread = RARRAY_AREF(list, i);
00056 rb_thread_wakeup_alive(thread);
00057 }
00058 rb_ary_clear(list);
00059 }
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 static VALUE
00098 rb_condvar_initialize(VALUE self)
00099 {
00100 RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new());
00101 return self;
00102 }
00103
00104 struct sleep_call {
00105 VALUE mutex;
00106 VALUE timeout;
00107 };
00108
00109 static ID id_sleep;
00110
00111 static VALUE
00112 do_sleep(VALUE args)
00113 {
00114 struct sleep_call *p = (struct sleep_call *)args;
00115 return rb_funcall2(p->mutex, id_sleep, 1, &p->timeout);
00116 }
00117
00118 static VALUE
00119 delete_current_thread(VALUE ary)
00120 {
00121 return rb_ary_delete(ary, rb_thread_current());
00122 }
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134 static VALUE
00135 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
00136 {
00137 VALUE waiters = GET_CONDVAR_WAITERS(self);
00138 VALUE mutex, timeout;
00139 struct sleep_call args;
00140
00141 rb_scan_args(argc, argv, "11", &mutex, &timeout);
00142
00143 args.mutex = mutex;
00144 args.timeout = timeout;
00145 rb_ary_push(waiters, rb_thread_current());
00146 rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
00147
00148 return self;
00149 }
00150
00151
00152
00153
00154
00155
00156
00157 static VALUE
00158 rb_condvar_signal(VALUE self)
00159 {
00160 wakeup_first_thread(GET_CONDVAR_WAITERS(self));
00161 return self;
00162 }
00163
00164
00165
00166
00167
00168
00169
00170 static VALUE
00171 rb_condvar_broadcast(VALUE self)
00172 {
00173 wakeup_all_threads(GET_CONDVAR_WAITERS(self));
00174 return self;
00175 }
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211 static VALUE
00212 rb_queue_initialize(VALUE self)
00213 {
00214 RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
00215 RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
00216 return self;
00217 }
00218
00219 static VALUE
00220 queue_do_push(VALUE self, VALUE obj)
00221 {
00222 rb_ary_push(GET_QUEUE_QUE(self), obj);
00223 wakeup_first_thread(GET_QUEUE_WAITERS(self));
00224 return self;
00225 }
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 static VALUE
00238 rb_queue_push(VALUE self, VALUE obj)
00239 {
00240 return queue_do_push(self, obj);
00241 }
00242
00243 static unsigned long
00244 queue_length(VALUE self)
00245 {
00246 return RARRAY_LEN(GET_QUEUE_QUE(self));
00247 }
00248
00249 static unsigned long
00250 queue_num_waiting(VALUE self)
00251 {
00252 return RARRAY_LEN(GET_QUEUE_WAITERS(self));
00253 }
00254
00255 struct waiting_delete {
00256 VALUE waiting;
00257 VALUE th;
00258 };
00259
00260 static VALUE
00261 queue_delete_from_waiting(struct waiting_delete *p)
00262 {
00263 rb_ary_delete(p->waiting, p->th);
00264 return Qnil;
00265 }
00266
00267 static VALUE
00268 queue_sleep(VALUE arg)
00269 {
00270 rb_thread_sleep_deadly();
00271 return Qnil;
00272 }
00273
00274 static VALUE
00275 queue_do_pop(VALUE self, VALUE should_block)
00276 {
00277 struct waiting_delete args;
00278 args.waiting = GET_QUEUE_WAITERS(self);
00279 args.th = rb_thread_current();
00280
00281 while (queue_length(self) == 0) {
00282 if (!(int)should_block) {
00283 rb_raise(rb_eThreadError, "queue empty");
00284 }
00285 rb_ary_push(args.waiting, args.th);
00286 rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
00287 }
00288
00289 return rb_ary_shift(GET_QUEUE_QUE(self));
00290 }
00291
00292 static VALUE
00293 queue_pop_should_block(int argc, VALUE *argv)
00294 {
00295 VALUE should_block = Qtrue;
00296 switch (argc) {
00297 case 0:
00298 break;
00299 case 1:
00300 should_block = RTEST(argv[0]) ? Qfalse : Qtrue;
00301 break;
00302 default:
00303 rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
00304 }
00305 return should_block;
00306 }
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322 static VALUE
00323 rb_queue_pop(int argc, VALUE *argv, VALUE self)
00324 {
00325 VALUE should_block = queue_pop_should_block(argc, argv);
00326 return queue_do_pop(self, should_block);
00327 }
00328
00329
00330
00331
00332
00333
00334
00335
00336 static VALUE
00337 rb_queue_empty_p(VALUE self)
00338 {
00339 return queue_length(self) == 0 ? Qtrue : Qfalse;
00340 }
00341
00342
00343
00344
00345
00346
00347
00348 static VALUE
00349 rb_queue_clear(VALUE self)
00350 {
00351 rb_ary_clear(GET_QUEUE_QUE(self));
00352 return self;
00353 }
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364 static VALUE
00365 rb_queue_length(VALUE self)
00366 {
00367 unsigned long len = queue_length(self);
00368 return ULONG2NUM(len);
00369 }
00370
00371
00372
00373
00374
00375
00376
00377 static VALUE
00378 rb_queue_num_waiting(VALUE self)
00379 {
00380 unsigned long len = queue_num_waiting(self);
00381 return ULONG2NUM(len);
00382 }
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400 static VALUE
00401 rb_szqueue_initialize(VALUE self, VALUE vmax)
00402 {
00403 long max;
00404
00405 max = NUM2LONG(vmax);
00406 if (max <= 0) {
00407 rb_raise(rb_eArgError, "queue size must be positive");
00408 }
00409
00410 RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
00411 RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
00412 RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
00413 RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
00414
00415 return self;
00416 }
00417
00418
00419
00420
00421
00422
00423
00424 static VALUE
00425 rb_szqueue_max_get(VALUE self)
00426 {
00427 return GET_SZQUEUE_MAX(self);
00428 }
00429
00430
00431
00432
00433
00434
00435
00436
00437 static VALUE
00438 rb_szqueue_max_set(VALUE self, VALUE vmax)
00439 {
00440 long max = NUM2LONG(vmax), diff = 0;
00441 VALUE t;
00442
00443 if (max <= 0) {
00444 rb_raise(rb_eArgError, "queue size must be positive");
00445 }
00446 if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
00447 diff = max - GET_SZQUEUE_ULONGMAX(self);
00448 }
00449 RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
00450 while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) {
00451 rb_thread_wakeup_alive(t);
00452 }
00453 return vmax;
00454 }
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468 static VALUE
00469 rb_szqueue_push(VALUE self, VALUE obj)
00470 {
00471 struct waiting_delete args;
00472 args.waiting = GET_SZQUEUE_WAITERS(self);
00473 args.th = rb_thread_current();
00474
00475 while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
00476 rb_ary_push(args.waiting, args.th);
00477 rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
00478 }
00479 return queue_do_push(self, obj);
00480 }
00481
00482 static VALUE
00483 szqueue_do_pop(VALUE self, VALUE should_block)
00484 {
00485 VALUE retval = queue_do_pop(self, should_block);
00486
00487 if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
00488 wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
00489 }
00490
00491 return retval;
00492 }
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508 static VALUE
00509 rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
00510 {
00511 VALUE should_block = queue_pop_should_block(argc, argv);
00512 return szqueue_do_pop(self, should_block);
00513 }
00514
00515
00516
00517
00518
00519
00520
00521 static VALUE
00522 rb_szqueue_clear(VALUE self)
00523 {
00524 rb_ary_clear(GET_QUEUE_QUE(self));
00525 wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
00526 return self;
00527 }
00528
00529
00530
00531
00532
00533
00534
00535 static VALUE
00536 rb_szqueue_num_waiting(VALUE self)
00537 {
00538 long len = queue_num_waiting(self);
00539 len += RARRAY_LEN(GET_SZQUEUE_WAITERS(self));
00540 return ULONG2NUM(len);
00541 }
00542
00543 #ifndef UNDER_THREAD
00544 #define UNDER_THREAD 1
00545 #endif
00546
00547 static VALUE
00548 undumpable(VALUE obj)
00549 {
00550 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
00551 UNREACHABLE;
00552 }
00553
00554 void
00555 Init_thread(void)
00556 {
00557 #if UNDER_THREAD
00558 #define ALIAS_GLOBAL_CONST(name) do { \
00559 ID id = rb_intern_const(#name); \
00560 if (!rb_const_defined_at(rb_cObject, id)) { \
00561 rb_const_set(rb_cObject, id, rb_c##name); \
00562 } \
00563 } while (0)
00564 #define OUTER rb_cThread
00565 #else
00566 #define ALIAS_GLOBAL_CONST(name) do { } while (0)
00567 #define OUTER 0
00568 #endif
00569
00570 VALUE rb_cConditionVariable = rb_struct_define_without_accessor_under(
00571 OUTER,
00572 "ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
00573 "waiters", NULL);
00574 VALUE rb_cQueue = rb_struct_define_without_accessor_under(
00575 OUTER,
00576 "Queue", rb_cObject, rb_struct_alloc_noinit,
00577 "que", "waiters", NULL);
00578 VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
00579 OUTER,
00580 "SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
00581 "que", "waiters", "queue_waiters", "size", NULL);
00582
00583 #if 0
00584 rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject);
00585 rb_cQueue = rb_define_class("Queue", rb_cObject);
00586 rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject);
00587 #endif
00588
00589 id_sleep = rb_intern("sleep");
00590
00591 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
00592 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
00593 rb_undef_method(rb_cConditionVariable, "initialize_copy");
00594 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
00595 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
00596 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
00597
00598 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
00599 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
00600 rb_undef_method(rb_cQueue, "initialize_copy");
00601 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
00602 rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
00603 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
00604 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
00605 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
00606 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
00607
00608
00609 rb_define_alias(rb_cQueue, "enq", "push");
00610
00611 rb_define_alias(rb_cQueue, "<<", "push");
00612
00613 rb_define_alias(rb_cQueue, "deq", "pop");
00614
00615 rb_define_alias(rb_cQueue, "shift", "pop");
00616
00617 rb_define_alias(rb_cQueue, "size", "length");
00618
00619 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
00620 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
00621 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
00622 rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
00623 rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
00624 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
00625 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
00626
00627
00628 rb_define_alias(rb_cSizedQueue, "enq", "push");
00629
00630 rb_define_alias(rb_cSizedQueue, "<<", "push");
00631
00632 rb_define_alias(rb_cSizedQueue, "deq", "pop");
00633
00634 rb_define_alias(rb_cSizedQueue, "shift", "pop");
00635
00636 rb_provide("thread.rb");
00637 ALIAS_GLOBAL_CONST(ConditionVariable);
00638 ALIAS_GLOBAL_CONST(Queue);
00639 ALIAS_GLOBAL_CONST(SizedQueue);
00640 }
00641