diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index aa0127d8c255..ba098bbee3f2 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -123,7 +123,7 @@ circular_buffer::dequeue(void *dst) { if (dst != NULL) { memcpy(dst, &_buffer[_next], unit_sz); } - DLOG(sched, mem, "shifted data from index %d", _next); + //DLOG(sched, mem, "shifted data from index %d", _next); _unread -= unit_sz; _next += unit_sz; if (_next == _buffer_sz) { diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 1eb826027988..9cdf07cd759f 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -53,13 +53,13 @@ rust_kernel::destroy_scheduler(rust_scheduler *sched) { } void rust_kernel::create_schedulers() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { threads.push(create_scheduler(i)); } } void rust_kernel::destroy_schedulers() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { destroy_scheduler(threads[i]); } } @@ -106,7 +106,7 @@ rust_kernel::get_port_handle(rust_port *port) { void rust_kernel::log_all_scheduler_state() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { threads[i]->log_state(); } } @@ -252,12 +252,12 @@ rust_kernel::signal_kernel_lock() { int rust_kernel::start_task_threads() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { rust_scheduler *thread = threads[i]; thread->start(); } - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { rust_scheduler *thread = threads[i]; thread->join(); } @@ -271,6 +271,12 @@ rust_kernel::create_task(rust_task *spawner, const char *name) { return threads[rand(&rctx) % num_threads]->create_task(spawner, name); } +void rust_kernel::wakeup_schedulers() { + for(size_t i = 0; i < num_threads; ++i) { + threads[i]->lock.signal_all(); + } +} + #ifdef __WIN32__ void rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 8be9bb96e90c..cf9d88e00164 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -106,6 +106,7 @@ public: bool is_deadlocked(); void signal_kernel_lock(); + void wakeup_schedulers(); /** * Notifies the kernel whenever a message has been enqueued . This gives diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 437be04e272c..4f19b0c681b1 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -87,6 +87,7 @@ rust_scheduler::reap_dead_tasks(int id) { I(this, lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; + task->lock.lock(); // Make sure this task isn't still running somewhere else... if (task->ref_count == 0 && task->can_schedule(id)) { I(this, task->tasks_waiting_to_join.is_empty()); @@ -94,10 +95,13 @@ rust_scheduler::reap_dead_tasks(int id) { DLOG(this, task, "deleting unreferenced dead task %s @0x%" PRIxPTR, task->name, task); + task->lock.unlock(); delete task; sync::decrement(kernel->live_tasks); + kernel->wakeup_schedulers(); continue; } + task->lock.unlock(); ++i; } } @@ -206,21 +210,15 @@ rust_scheduler::start_main_loop() { rust_task *scheduled_task = schedule_task(id); - // The scheduler busy waits until a task is available for scheduling. - // Eventually we'll want a smarter way to do this, perhaps sleep - // for a minimum amount of time. - if (scheduled_task == NULL) { log_state(); DLOG(this, task, "all tasks are blocked, scheduler id %d yielding ...", id); - lock.unlock(); - sync::sleep(100); - lock.lock(); - DLOG(this, task, - "scheduler resuming ..."); + lock.timed_wait(100000); reap_dead_tasks(id); + DLOG(this, task, + "scheduler %d resuming ...", id); continue; } diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 538a9b34e569..05efc12e4d43 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -105,7 +105,7 @@ rust_task::~rust_task() /* FIXME: tighten this up, there are some more assertions that hold at task-lifecycle events. */ - // I(sched, ref_count == 0 || + I(sched, ref_count == 0); // || // (ref_count == 1 && this == sched->root_task)); del_stk(this, stk); @@ -167,6 +167,7 @@ rust_task::start(uintptr_t spawnee_fn, yield_timer.reset_us(0); transition(&sched->newborn_tasks, &sched->running_tasks); + sched->lock.signal(); } void @@ -212,6 +213,8 @@ rust_task::kill() { if (NULL == supervisor && propagate_failure) sched->fail(); + sched->lock.signal(); + LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this); // run_on_resume(rust_unwind_glue); } @@ -442,12 +445,15 @@ rust_task::wakeup(rust_cond *from) { if(_on_wakeup) { _on_wakeup->on_wakeup(); } + + sched->lock.signal(); } void rust_task::die() { scoped_lock with(lock); transition(&sched->running_tasks, &sched->dead_tasks); + sched->lock.signal(); } void diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 3d0d10138890..f4c3778837d0 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -10,7 +10,9 @@ #include "lock_and_signal.h" #if defined(__WIN32__) -lock_and_signal::lock_and_signal() { +lock_and_signal::lock_and_signal() + : alive(true) +{ // FIXME: In order to match the behavior of pthread_cond_broadcast on // Windows, we create manual reset events. This however breaks the // behavior of pthread_cond_signal, fixing this is quite involved: @@ -22,7 +24,7 @@ lock_and_signal::lock_and_signal() { #else lock_and_signal::lock_and_signal() - : _locked(false) + : _locked(false), alive(true) { CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL)); @@ -36,6 +38,7 @@ lock_and_signal::~lock_and_signal() { CHECKED(pthread_cond_destroy(&_cond)); CHECKED(pthread_mutex_destroy(&_mutex)); #endif + alive = false; } void lock_and_signal::lock() { @@ -65,11 +68,14 @@ void lock_and_signal::wait() { timed_wait(0); } -void lock_and_signal::timed_wait(size_t timeout_in_ns) { +bool lock_and_signal::timed_wait(size_t timeout_in_ns) { + _locked = false; + bool rv = true; #if defined(__WIN32__) LeaveCriticalSection(&_cs); WaitForSingleObject(_event, INFINITE); EnterCriticalSection(&_cs); + _holding_thread = GetCurrentThreadId(); #else if (timeout_in_ns == 0) { CHECKED(pthread_cond_wait(&_cond, &_mutex)); @@ -79,9 +85,29 @@ void lock_and_signal::timed_wait(size_t timeout_in_ns) { timespec time_spec; time_spec.tv_sec = time_val.tv_sec + 0; time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns; - CHECKED(pthread_cond_timedwait(&_cond, &_mutex, &time_spec)); + if(time_spec.tv_nsec >= 1000000000) { + time_spec.tv_sec++; + time_spec.tv_nsec -= 1000000000; + } + int cond_wait_status + = pthread_cond_timedwait(&_cond, &_mutex, &time_spec); + switch(cond_wait_status) { + case 0: + // successfully grabbed the lock. + break; + case ETIMEDOUT: + // Oops, we timed out. + rv = false; + break; + default: + // Error + CHECKED(cond_wait_status); + } } + _holding_thread = pthread_self(); #endif + _locked = true; + return rv; } /** diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 60c22958342f..6e656017115d 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -14,6 +14,9 @@ class lock_and_signal { pthread_t _holding_thread; #endif bool _locked; + + bool alive; + public: lock_and_signal(); virtual ~lock_and_signal(); @@ -21,7 +24,7 @@ public: void lock(); void unlock(); void wait(); - void timed_wait(size_t timeout_in_ns); + bool timed_wait(size_t timeout_in_ns); void signal(); void signal_all(); diff --git a/src/test/bench/task-perf-word-count.rs b/src/test/bench/task-perf-word-count.rs index 6e7e2bdba8aa..0a6b94c7f2ed 100644 --- a/src/test/bench/task-perf-word-count.rs +++ b/src/test/bench/task-perf-word-count.rs @@ -184,7 +184,6 @@ mod map_reduce { let m; ctrl |> m; - alt m { mapper_done. { // log_err "received mapper terminated.";