From 5302cde188bba80dd38c58eaafa792d621b0818c Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Mon, 25 Jul 2011 18:00:37 -0700 Subject: [PATCH] Made task threads wait instead of sleep, so they can be woken up. This appears to give us much better parallel performance. Also, commented out one more unsafe log and updated rust_kernel.cpp to compile under g++ --- src/rt/circular_buffer.cpp | 2 +- src/rt/rust_kernel.cpp | 16 ++++++++---- src/rt/rust_kernel.h | 1 + src/rt/rust_scheduler.cpp | 16 ++++++------ src/rt/rust_task.cpp | 8 +++++- src/rt/sync/lock_and_signal.cpp | 34 +++++++++++++++++++++++--- src/rt/sync/lock_and_signal.h | 5 +++- src/test/bench/task-perf-word-count.rs | 1 - 8 files changed, 61 insertions(+), 22 deletions(-) diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index aa0127d8c255..ba098bbee3f2 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -123,7 +123,7 @@ circular_buffer::dequeue(void *dst) { if (dst != NULL) { memcpy(dst, &_buffer[_next], unit_sz); } - DLOG(sched, mem, "shifted data from index %d", _next); + //DLOG(sched, mem, "shifted data from index %d", _next); _unread -= unit_sz; _next += unit_sz; if (_next == _buffer_sz) { diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 1eb826027988..9cdf07cd759f 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -53,13 +53,13 @@ rust_kernel::destroy_scheduler(rust_scheduler *sched) { } void rust_kernel::create_schedulers() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { threads.push(create_scheduler(i)); } } void rust_kernel::destroy_schedulers() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { destroy_scheduler(threads[i]); } } @@ -106,7 +106,7 @@ rust_kernel::get_port_handle(rust_port *port) { void rust_kernel::log_all_scheduler_state() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { threads[i]->log_state(); } } @@ -252,12 +252,12 @@ rust_kernel::signal_kernel_lock() { int rust_kernel::start_task_threads() { - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { rust_scheduler *thread = threads[i]; thread->start(); } - for(int i = 0; i < num_threads; ++i) { + for(size_t i = 0; i < num_threads; ++i) { rust_scheduler *thread = threads[i]; thread->join(); } @@ -271,6 +271,12 @@ rust_kernel::create_task(rust_task *spawner, const char *name) { return threads[rand(&rctx) % num_threads]->create_task(spawner, name); } +void rust_kernel::wakeup_schedulers() { + for(size_t i = 0; i < num_threads; ++i) { + threads[i]->lock.signal_all(); + } +} + #ifdef __WIN32__ void rust_kernel::win32_require(LPCTSTR fn, BOOL ok) { diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 8be9bb96e90c..cf9d88e00164 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -106,6 +106,7 @@ public: bool is_deadlocked(); void signal_kernel_lock(); + void wakeup_schedulers(); /** * Notifies the kernel whenever a message has been enqueued . This gives diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 437be04e272c..4f19b0c681b1 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -87,6 +87,7 @@ rust_scheduler::reap_dead_tasks(int id) { I(this, lock.lock_held_by_current_thread()); for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; + task->lock.lock(); // Make sure this task isn't still running somewhere else... if (task->ref_count == 0 && task->can_schedule(id)) { I(this, task->tasks_waiting_to_join.is_empty()); @@ -94,10 +95,13 @@ rust_scheduler::reap_dead_tasks(int id) { DLOG(this, task, "deleting unreferenced dead task %s @0x%" PRIxPTR, task->name, task); + task->lock.unlock(); delete task; sync::decrement(kernel->live_tasks); + kernel->wakeup_schedulers(); continue; } + task->lock.unlock(); ++i; } } @@ -206,21 +210,15 @@ rust_scheduler::start_main_loop() { rust_task *scheduled_task = schedule_task(id); - // The scheduler busy waits until a task is available for scheduling. - // Eventually we'll want a smarter way to do this, perhaps sleep - // for a minimum amount of time. - if (scheduled_task == NULL) { log_state(); DLOG(this, task, "all tasks are blocked, scheduler id %d yielding ...", id); - lock.unlock(); - sync::sleep(100); - lock.lock(); - DLOG(this, task, - "scheduler resuming ..."); + lock.timed_wait(100000); reap_dead_tasks(id); + DLOG(this, task, + "scheduler %d resuming ...", id); continue; } diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 538a9b34e569..05efc12e4d43 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -105,7 +105,7 @@ rust_task::~rust_task() /* FIXME: tighten this up, there are some more assertions that hold at task-lifecycle events. */ - // I(sched, ref_count == 0 || + I(sched, ref_count == 0); // || // (ref_count == 1 && this == sched->root_task)); del_stk(this, stk); @@ -167,6 +167,7 @@ rust_task::start(uintptr_t spawnee_fn, yield_timer.reset_us(0); transition(&sched->newborn_tasks, &sched->running_tasks); + sched->lock.signal(); } void @@ -212,6 +213,8 @@ rust_task::kill() { if (NULL == supervisor && propagate_failure) sched->fail(); + sched->lock.signal(); + LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this); // run_on_resume(rust_unwind_glue); } @@ -442,12 +445,15 @@ rust_task::wakeup(rust_cond *from) { if(_on_wakeup) { _on_wakeup->on_wakeup(); } + + sched->lock.signal(); } void rust_task::die() { scoped_lock with(lock); transition(&sched->running_tasks, &sched->dead_tasks); + sched->lock.signal(); } void diff --git a/src/rt/sync/lock_and_signal.cpp b/src/rt/sync/lock_and_signal.cpp index 3d0d10138890..f4c3778837d0 100644 --- a/src/rt/sync/lock_and_signal.cpp +++ b/src/rt/sync/lock_and_signal.cpp @@ -10,7 +10,9 @@ #include "lock_and_signal.h" #if defined(__WIN32__) -lock_and_signal::lock_and_signal() { +lock_and_signal::lock_and_signal() + : alive(true) +{ // FIXME: In order to match the behavior of pthread_cond_broadcast on // Windows, we create manual reset events. This however breaks the // behavior of pthread_cond_signal, fixing this is quite involved: @@ -22,7 +24,7 @@ lock_and_signal::lock_and_signal() { #else lock_and_signal::lock_and_signal() - : _locked(false) + : _locked(false), alive(true) { CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL)); @@ -36,6 +38,7 @@ lock_and_signal::~lock_and_signal() { CHECKED(pthread_cond_destroy(&_cond)); CHECKED(pthread_mutex_destroy(&_mutex)); #endif + alive = false; } void lock_and_signal::lock() { @@ -65,11 +68,14 @@ void lock_and_signal::wait() { timed_wait(0); } -void lock_and_signal::timed_wait(size_t timeout_in_ns) { +bool lock_and_signal::timed_wait(size_t timeout_in_ns) { + _locked = false; + bool rv = true; #if defined(__WIN32__) LeaveCriticalSection(&_cs); WaitForSingleObject(_event, INFINITE); EnterCriticalSection(&_cs); + _holding_thread = GetCurrentThreadId(); #else if (timeout_in_ns == 0) { CHECKED(pthread_cond_wait(&_cond, &_mutex)); @@ -79,9 +85,29 @@ void lock_and_signal::timed_wait(size_t timeout_in_ns) { timespec time_spec; time_spec.tv_sec = time_val.tv_sec + 0; time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns; - CHECKED(pthread_cond_timedwait(&_cond, &_mutex, &time_spec)); + if(time_spec.tv_nsec >= 1000000000) { + time_spec.tv_sec++; + time_spec.tv_nsec -= 1000000000; + } + int cond_wait_status + = pthread_cond_timedwait(&_cond, &_mutex, &time_spec); + switch(cond_wait_status) { + case 0: + // successfully grabbed the lock. + break; + case ETIMEDOUT: + // Oops, we timed out. + rv = false; + break; + default: + // Error + CHECKED(cond_wait_status); + } } + _holding_thread = pthread_self(); #endif + _locked = true; + return rv; } /** diff --git a/src/rt/sync/lock_and_signal.h b/src/rt/sync/lock_and_signal.h index 60c22958342f..6e656017115d 100644 --- a/src/rt/sync/lock_and_signal.h +++ b/src/rt/sync/lock_and_signal.h @@ -14,6 +14,9 @@ class lock_and_signal { pthread_t _holding_thread; #endif bool _locked; + + bool alive; + public: lock_and_signal(); virtual ~lock_and_signal(); @@ -21,7 +24,7 @@ public: void lock(); void unlock(); void wait(); - void timed_wait(size_t timeout_in_ns); + bool timed_wait(size_t timeout_in_ns); void signal(); void signal_all(); diff --git a/src/test/bench/task-perf-word-count.rs b/src/test/bench/task-perf-word-count.rs index 6e7e2bdba8aa..0a6b94c7f2ed 100644 --- a/src/test/bench/task-perf-word-count.rs +++ b/src/test/bench/task-perf-word-count.rs @@ -184,7 +184,6 @@ mod map_reduce { let m; ctrl |> m; - alt m { mapper_done. { // log_err "received mapper terminated.";