diff --git a/src/runtime/synchronization_common.h b/src/runtime/synchronization_common.h index 778c423e4046..9b19dc92d8cc 100644 --- a/src/runtime/synchronization_common.h +++ b/src/runtime/synchronization_common.h @@ -834,6 +834,7 @@ class fast_cond { ALWAYS_INLINE void broadcast() { if_tsan_pre_signal(this); + uintptr_t val; atomic_load_relaxed(&state, &val); if (val == 0) { @@ -846,6 +847,7 @@ class fast_cond { } ALWAYS_INLINE void wait(fast_mutex *mutex) { + // Go to sleep until signaled wait_parking_control control(&state, mutex); uintptr_t result = control.park((uintptr_t)this); if (result != (uintptr_t)mutex) { diff --git a/src/runtime/thread_pool_common.h b/src/runtime/thread_pool_common.h index b13427a4261c..961ed479060e 100644 --- a/src/runtime/thread_pool_common.h +++ b/src/runtime/thread_pool_common.h @@ -30,6 +30,57 @@ namespace Halide { namespace Runtime { namespace Internal { +// A condition variable, augmented with a bit of spinning on an atomic counter +// before going to sleep for real. This helps reduce overhead at the end of a +// parallel for loop when idle worker threads are waiting for other threads to +// finish so that the next parallel for loop can begin. +struct halide_cond_with_spinning { + halide_cond cond; + uintptr_t counter; + + void wait(halide_mutex *mutex) { + // First spin for a bit, checking the counter for another thread to bump + // it. + uintptr_t initial; + Synchronization::atomic_load_relaxed(&counter, &initial); + halide_mutex_unlock(mutex); + for (int spin = 0; spin < 40; spin++) { + halide_thread_yield(); + uintptr_t current; + Synchronization::atomic_load_relaxed(&counter, ¤t); + if (current != initial) { + halide_mutex_lock(mutex); + return; + } + } + + // Give up on spinning and relock the mutex preparing to sleep for real. + halide_mutex_lock(mutex); + + // Check one final time with the lock held. This guarantees we won't + // miss an increment of the counter because it is only ever incremented + // with the lock held. + uintptr_t current; + Synchronization::atomic_load_relaxed(&counter, ¤t); + if (current != initial) { + return; + } + + halide_cond_wait(&cond, mutex); + } + + void broadcast() { + // Release any spinning waiters + Synchronization::atomic_fetch_add_acquire_release(&counter, (uintptr_t)1); + + // Release any sleeping waiters + halide_cond_broadcast(&cond); + } + + // Note that this cond var variant doesn't have signal(), because it always + // wakes all spinning waiters. +}; + struct work { halide_parallel_task_t task; @@ -121,7 +172,7 @@ struct work_queue_t { // may want to wake them up independently. Any code that may // invalidate any of the reasons a worker or owner may have slept // must signal or broadcast the appropriate condition variable. - halide_cond wake_a_team, wake_b_team, wake_owners; + halide_cond_with_spinning wake_a_team, wake_b_team, wake_owners; // The number of sleeping workers and owners. An over-estimate - a // waking-up thread may not have decremented this yet. @@ -203,9 +254,6 @@ WEAK void dump_job_state() { WEAK void worker_thread(void *); WEAK void worker_thread_already_locked(work *owned_job) { - int spin_count = 0; - const int max_spin_count = 40; - while (owned_job ? owned_job->running() : !work_queue.shutdown) { work *job = work_queue.jobs; work **prev_ptr = &work_queue.jobs; @@ -226,7 +274,7 @@ WEAK void worker_thread_already_locked(work *owned_job) { // The wakeup can likely be only done under certain conditions, but it is only happening // in when an error has already occured and it seems more important to ensure reliable // termination than to optimize this path. - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_owners.broadcast(); continue; } } @@ -283,38 +331,24 @@ WEAK void worker_thread_already_locked(work *owned_job) { if (!job) { // There is no runnable job. Go to sleep. if (owned_job) { - if (spin_count++ < max_spin_count) { - // Give the workers a chance to finish up before sleeping - halide_mutex_unlock(&work_queue.mutex); - halide_thread_yield(); - halide_mutex_lock(&work_queue.mutex); - } else { - work_queue.owners_sleeping++; - owned_job->owner_is_sleeping = true; - halide_cond_wait(&work_queue.wake_owners, &work_queue.mutex); - owned_job->owner_is_sleeping = false; - work_queue.owners_sleeping--; - } + work_queue.owners_sleeping++; + owned_job->owner_is_sleeping = true; + work_queue.wake_owners.wait(&work_queue.mutex); + owned_job->owner_is_sleeping = false; + work_queue.owners_sleeping--; } else { work_queue.workers_sleeping++; if (work_queue.a_team_size > work_queue.target_a_team_size) { // Transition to B team work_queue.a_team_size--; - halide_cond_wait(&work_queue.wake_b_team, &work_queue.mutex); + work_queue.wake_b_team.wait(&work_queue.mutex); work_queue.a_team_size++; - } else if (spin_count++ < max_spin_count) { - // Spin waiting for new work - halide_mutex_unlock(&work_queue.mutex); - halide_thread_yield(); - halide_mutex_lock(&work_queue.mutex); } else { - halide_cond_wait(&work_queue.wake_a_team, &work_queue.mutex); + work_queue.wake_a_team.wait(&work_queue.mutex); } work_queue.workers_sleeping--; } continue; - } else { - spin_count = 0; } log_message("Working on job " << job->task.name); @@ -432,7 +466,7 @@ WEAK void worker_thread_already_locked(work *owned_job) { if (wake_owners || (job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) { // The job is done or some owned job failed via sibling linkage. Wake up the owner. - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_owners.broadcast(); } } } @@ -554,11 +588,11 @@ WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_paren work_queue.target_a_team_size = workers_to_wake; } - halide_cond_broadcast(&work_queue.wake_a_team); + work_queue.wake_a_team.broadcast(); if (work_queue.target_a_team_size > work_queue.a_team_size) { - halide_cond_broadcast(&work_queue.wake_b_team); + work_queue.wake_b_team.broadcast(); if (stealable_jobs) { - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_owners.broadcast(); } } @@ -707,9 +741,9 @@ WEAK void halide_shutdown_thread_pool() { halide_mutex_lock(&work_queue.mutex); work_queue.shutdown = true; - halide_cond_broadcast(&work_queue.wake_owners); - halide_cond_broadcast(&work_queue.wake_a_team); - halide_cond_broadcast(&work_queue.wake_b_team); + work_queue.wake_owners.broadcast(); + work_queue.wake_a_team.broadcast(); + work_queue.wake_b_team.broadcast(); halide_mutex_unlock(&work_queue.mutex); // Wait until they leave @@ -739,8 +773,8 @@ WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n) { if (old_val == 0 && n != 0) { // Don't wake if nothing released. // We may have just made a job runnable halide_mutex_lock(&work_queue.mutex); - halide_cond_broadcast(&work_queue.wake_a_team); - halide_cond_broadcast(&work_queue.wake_owners); + work_queue.wake_a_team.broadcast(); + work_queue.wake_owners.broadcast(); halide_mutex_unlock(&work_queue.mutex); } return old_val + n;