From 0648f03133a899269a86cdd204d45582871bbb61 Mon Sep 17 00:00:00 2001 From: Ralf Jung Date: Mon, 6 Jun 2022 18:34:41 -0400 Subject: [PATCH] ensure all worker threads stay around --- ui_test/src/lib.rs | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/ui_test/src/lib.rs b/ui_test/src/lib.rs index 7f9eb48e05f9..8b7b4783fabb 100644 --- a/ui_test/src/lib.rs +++ b/ui_test/src/lib.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::fmt::Write; use std::path::{Path, PathBuf}; use std::process::{Command, ExitStatus}; @@ -6,7 +7,6 @@ use std::sync::Mutex; use colored::*; use comments::ErrorMatch; -use crossbeam::queue::SegQueue; use regex::Regex; use rustc_stderr::{Level, Message}; @@ -55,9 +55,8 @@ pub fn run_tests(config: Config) { // Get the triple with which to run the tests let target = config.target.clone().unwrap_or_else(|| config.get_host()); - // A queue for files or folders to process - let todo = SegQueue::new(); - todo.push(config.root_dir.clone()); + // A channel for files to process + let (submit, receive) = crossbeam::channel::unbounded(); // Some statistics and failure reports. let failures = Mutex::new(vec![]); @@ -66,20 +65,31 @@ pub fn run_tests(config: Config) { let filtered = AtomicUsize::default(); crossbeam::scope(|s| { + // Create a thread that is in charge of walking the directory and submitting jobs. + // It closes the channel when it is done. + s.spawn(|_| { + let mut todo = VecDeque::new(); + todo.push_back(config.root_dir.clone()); + while let Some(path) = todo.pop_front() { + if path.is_dir() { + // Enqueue everything inside this directory. + for entry in std::fs::read_dir(path).unwrap() { + todo.push_back(entry.unwrap().path()); + } + } else if path.extension().map(|ext| ext == "rs").unwrap_or(false) { + // Forward .rs files to the test workers. + submit.send(path).unwrap(); + } + } + // There will be no more jobs. This signals the workers to quit. + // (This also ensures `submit` is moved into this closure.) + drop(submit); + }); + + // Create N worker threads that receive files to test. for _ in 0..std::thread::available_parallelism().unwrap().get() { s.spawn(|_| { - while let Some(path) = todo.pop() { - // Collect everything inside directories - if path.is_dir() { - for entry in std::fs::read_dir(path).unwrap() { - todo.push(entry.unwrap().path()); - } - continue; - } - // Only look at .rs files - if !path.extension().map(|ext| ext == "rs").unwrap_or(false) { - continue; - } + for path in &receive { if !config.path_filter.is_empty() { let path_display = path.display().to_string(); if !config.path_filter.iter().any(|filter| path_display.contains(filter)) {