diff --git a/src/lib.rs b/src/lib.rs index 1ba7880..4c6c1d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ pub mod util { pub mod parse; pub mod point; pub mod slice; + pub mod thread; } /// # Help Santa by solving puzzles to fix the weather machine's snow function. diff --git a/src/util/thread.rs b/src/util/thread.rs new file mode 100644 index 0000000..d5f974c --- /dev/null +++ b/src/util/thread.rs @@ -0,0 +1,47 @@ +//! Utility methods to spawn a number of +//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html) +//! threads equals to the number of cores on the machine. Unlike normal threads, scoped threads +//! can borrow data from their environment. +use std::thread::*; + +/// Spawn `n` scoped threads, where `n` is the available parallelism. +pub fn spawn(f: F) +where + F: FnOnce() -> T + Copy + Send, + T: Send, +{ + scope(|scope| { + for _ in 0..threads() { + scope.spawn(f); + } + }); +} + +/// Splits `items` into batches, one per thread. Items are assigned in a round robin fashion, +/// to achieve a crude load balacing in case some items are more complex to process than others. +pub fn spawn_batches(mut items: Vec, f: F) +where + F: FnOnce(Vec) -> T + Copy + Send, + T: Send, + U: Send, +{ + let threads = threads(); + let mut batches: Vec<_> = (0..threads).map(|_| Vec::new()).collect(); + let mut index = 0; + + // Round robin items over each thread. + while let Some(next) = items.pop() { + batches[index % threads].push(next); + index += 1; + } + + scope(|scope| { + for batch in batches { + scope.spawn(move || f(batch)); + } + }); +} + +fn threads() -> usize { + available_parallelism().unwrap().get() +} diff --git a/src/year2015/day04.rs b/src/year2015/day04.rs index 9bac95c..8c5a44d 100644 --- a/src/year2015/day04.rs +++ b/src/year2015/day04.rs @@ -18,8 +18,8 @@ //! [`MD5`]: crate::util::md5 //! [`format!`]: std::format use crate::util::md5::*; +use crate::util::thread::*; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::thread; pub struct Shared { prefix: String, @@ -45,13 +45,11 @@ pub fn parse(input: &str) -> Shared { } // Use as many cores as possible to parallelize the remaining search. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - #[cfg(not(feature = "simd"))] - scope.spawn(|| worker(&shared)); - #[cfg(feature = "simd")] - scope.spawn(|| simd::worker(&shared)); - } + spawn(|| { + #[cfg(not(feature = "simd"))] + worker(&shared); + #[cfg(feature = "simd")] + simd::worker(&shared); }); shared diff --git a/src/year2016/day05.rs b/src/year2016/day05.rs index d5c8d52..1341ab1 100644 --- a/src/year2016/day05.rs +++ b/src/year2016/day05.rs @@ -5,9 +5,9 @@ //! //! [`Year 2015 Day 4`]: crate::year2015::day04 use crate::util::md5::*; +use crate::util::thread::*; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Mutex; -use std::thread; struct Shared { prefix: String, @@ -35,13 +35,11 @@ pub fn parse(input: &str) -> Vec { } // Use as many cores as possible to parallelize the remaining search. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - #[cfg(not(feature = "simd"))] - scope.spawn(|| worker(&shared, &mutex)); - #[cfg(feature = "simd")] - scope.spawn(|| simd::worker(&shared, &mutex)); - } + spawn(|| { + #[cfg(not(feature = "simd"))] + worker(&shared, &mutex); + #[cfg(feature = "simd")] + simd::worker(&shared, &mutex); }); let mut found = mutex.into_inner().unwrap().found; diff --git a/src/year2016/day14.rs b/src/year2016/day14.rs index 0908616..ca0922a 100644 --- a/src/year2016/day14.rs +++ b/src/year2016/day14.rs @@ -3,10 +3,10 @@ //! Brute force slog through all possible keys, parallelized as much as possible. An optimization //! for part two is a quick method to convert `u32` to 8 ASCII digits. use crate::util::md5::*; +use crate::util::thread::*; use std::collections::{BTreeMap, BTreeSet}; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Mutex; -use std::thread; /// Atomics can be safely shared between threads. struct Shared<'a> { @@ -44,11 +44,7 @@ fn generate_pad(input: &str, part_two: bool) -> i32 { let mutex = Mutex::new(exclusive); // Use as many cores as possible to parallelize the search. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - scope.spawn(|| worker(&shared, &mutex, part_two)); - } - }); + spawn(|| worker(&shared, &mutex, part_two)); let exclusive = mutex.into_inner().unwrap(); *exclusive.found.iter().nth(63).unwrap() diff --git a/src/year2017/day14.rs b/src/year2017/day14.rs index 9fff22d..b9f59f8 100644 --- a/src/year2017/day14.rs +++ b/src/year2017/day14.rs @@ -5,9 +5,9 @@ //! //! [`Day 10`]: crate::year2017::day10 //! [`Day 12`]: crate::year2017::day12 +use crate::util::thread::*; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -use std::thread; /// Atomics can be safely shared between threads. pub struct Shared { @@ -27,11 +27,7 @@ pub fn parse(input: &str) -> Vec { let mutex = Mutex::new(exclusive); // Use as many cores as possible to parallelize the hashing. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - scope.spawn(|| worker(&shared, &mutex)); - } - }); + spawn(|| worker(&shared, &mutex)); mutex.into_inner().unwrap().grid } diff --git a/src/year2018/day11.rs b/src/year2018/day11.rs index 208bc54..ad141ce 100644 --- a/src/year2018/day11.rs +++ b/src/year2018/day11.rs @@ -6,8 +6,8 @@ //! This makes the total complexity `O(n³)`, however the calculation for each size is independent //! so we can parallelize over multiple threads. use crate::util::parse::*; +use crate::util::thread::*; use std::sync::Mutex; -use std::thread; pub struct Result { x: usize, @@ -38,36 +38,15 @@ pub fn parse(input: &str) -> Vec { } // Use as many cores as possible to parallelize the search. - let threads = thread::available_parallelism().unwrap().get(); + // Smaller sizes take more time so keep batches roughly the same effort so that some + // threads are not finishing too soon and waiting idle, while others are still busy. + // For example if there are 4 cores, then they will be assigned sizes: + // * 1, 5, 9, .. + // * 2, 6, 10, .. + // * 3, 7, 11, .. + // * 4, 8, 12, .. let mutex = Mutex::new(Vec::new()); - - thread::scope(|scope| { - for i in 0..threads { - // Shadow references in local variables so that they can be moved into closure. - let sat = &sat; - let mutex = &mutex; - - // Smaller sizes take more time so keep batches roughly the same effort so that some - // threads are not finishing too soon and waiting idle, while others are still busy. - // For example if there are 4 cores, then they will be assigned sizes: - // * 1, 5, 9, .. - // * 2, 6, 10, .. - // * 3, 7, 11, .. - // * 4, 8, 12, .. - scope.spawn(move || { - let batch: Vec<_> = (1 + i..301) - .step_by(threads) - .map(|size| { - let (power, x, y) = square(sat, size); - Result { x, y, size, power } - }) - .collect(); - - mutex.lock().unwrap().extend(batch); - }); - } - }); - + spawn_batches((1..301).collect(), |batch| worker(batch, &sat, &mutex)); mutex.into_inner().unwrap() } @@ -81,6 +60,18 @@ pub fn part2(input: &[Result]) -> String { format!("{x},{y},{size}") } +fn worker(batch: Vec, sat: &[i32], mutex: &Mutex>) { + let result: Vec<_> = batch + .into_iter() + .map(|size| { + let (power, x, y) = square(sat, size); + Result { x, y, size, power } + }) + .collect(); + + mutex.lock().unwrap().extend(result); +} + /// Find the (x,y) coordinates and max power for a square of the specified size. fn square(sat: &[i32], size: usize) -> (i32, usize, usize) { let mut max_power = i32::MIN; diff --git a/src/year2018/day15.rs b/src/year2018/day15.rs index ac8f5ca..57b852a 100644 --- a/src/year2018/day15.rs +++ b/src/year2018/day15.rs @@ -78,9 +78,9 @@ //! Choosing the first intersection in reading order the Elf correctly moves left. use crate::util::grid::*; use crate::util::point::*; +use crate::util::thread::*; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::mpsc::{channel, Sender}; -use std::thread; const READING_ORDER: [Point; 4] = [UP, LEFT, RIGHT, DOWN]; @@ -149,11 +149,7 @@ pub fn part2(input: &Input) -> i32 { let shared = Shared { done: AtomicBool::new(false), elf_attack_power: AtomicI32::new(4), tx }; // Use as many cores as possible to parallelize the search. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - scope.spawn(|| worker(input, &shared)); - } - }); + spawn(|| worker(input, &shared)); // Hang up the channel. drop(shared.tx); diff --git a/src/year2018/day24.rs b/src/year2018/day24.rs index 80ba8c3..b5afab2 100644 --- a/src/year2018/day24.rs +++ b/src/year2018/day24.rs @@ -9,9 +9,9 @@ //! [`Day 15`]: crate::year2018::day15 use crate::util::hash::*; use crate::util::parse::*; +use crate::util::thread::*; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::mpsc::{channel, Sender}; -use std::thread; pub struct Input { immune: Vec, @@ -99,11 +99,7 @@ pub fn part2(input: &Input) -> i32 { let shared = Shared { done: AtomicBool::new(false), boost: AtomicI32::new(1), tx }; // Use as many cores as possible to parallelize the search. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - scope.spawn(|| worker(input, &shared)); - } - }); + spawn(|| worker(input, &shared)); // Hang up the channel. drop(shared.tx); diff --git a/src/year2021/day18.rs b/src/year2021/day18.rs index d67c2d9..e24c461 100644 --- a/src/year2021/day18.rs +++ b/src/year2021/day18.rs @@ -27,8 +27,8 @@ //! The root node is stored at index 0. For a node at index `i` its left child is at index //! `2i + 1`, right child at index `2i + 2` and parent at index `i / 2`. As leaf nodes are //! always greater than or equal to zero, `-1` is used as a special sentinel value for non-leaf nodes. +use crate::util::thread::*; use std::sync::Mutex; -use std::thread; type Snailfish = [i32; 63]; @@ -83,20 +83,10 @@ pub fn part2(input: &[Snailfish]) -> i32 { } } - // Break the work into roughly equally size batches. - let threads = thread::available_parallelism().unwrap().get(); - let size = pairs.len().div_ceil(threads); - let batches: Vec<_> = pairs.chunks(size).collect(); - - // Use as many cores as possible to parallelize the calculation. + // Use as many cores as possible to parallelize the calculation, + // breaking the work into roughly equally size batches. let mutex = Mutex::new(0); - - thread::scope(|scope| { - for batch in batches { - scope.spawn(|| worker(batch, &mutex)); - } - }); - + spawn_batches(pairs, |batch| worker(&batch, &mutex)); mutex.into_inner().unwrap() } diff --git a/src/year2022/day11.rs b/src/year2022/day11.rs index 597f94d..b9d52b7 100644 --- a/src/year2022/day11.rs +++ b/src/year2022/day11.rs @@ -42,8 +42,8 @@ //! //! [`iter_unsigned`]: ParseOps::iter_unsigned use crate::util::parse::*; +use crate::util::thread::*; use std::sync::Mutex; -use std::thread; pub struct Monkey { items: Vec, @@ -130,11 +130,7 @@ fn parallel(monkeys: &[Monkey], pairs: Vec) -> Business { let mutex = Mutex::new(exclusive); // Use as many cores as possible to parallelize the calculation. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - scope.spawn(|| worker(monkeys, &mutex)); - } - }); + spawn(|| worker(monkeys, &mutex)); mutex.into_inner().unwrap().business } diff --git a/src/year2023/day12.rs b/src/year2023/day12.rs index 06c8b7b..3150d3c 100644 --- a/src/year2023/day12.rs +++ b/src/year2023/day12.rs @@ -119,8 +119,8 @@ //! This is equivalent to the prefix sum approach described above but a little clearer to //! understand however slower to calculate. use crate::util::parse::*; +use crate::util::thread::*; use std::sync::atomic::{AtomicU64, Ordering}; -use std::thread; type Spring<'a> = (&'a [u8], Vec); @@ -141,20 +141,10 @@ pub fn part1(input: &[Spring<'_>]) -> u64 { } pub fn part2(input: &[Spring<'_>]) -> u64 { - // Break the work into roughly equally size batches. - let threads = thread::available_parallelism().unwrap().get(); - let size = input.len().div_ceil(threads); - let batches: Vec<_> = input.chunks(size).collect(); - - // Use as many cores as possible to parallelize the calculation. + // Use as many cores as possible to parallelize the calculation, + // breaking the work into roughly equally size batches. let shared = AtomicU64::new(0); - - thread::scope(|scope| { - for batch in batches { - scope.spawn(|| shared.fetch_add(solve(batch, 5), Ordering::Relaxed)); - } - }); - + spawn_batches(input.to_vec(), |batch| shared.fetch_add(solve(&batch, 5), Ordering::Relaxed)); shared.load(Ordering::Relaxed) } diff --git a/src/year2023/day16.rs b/src/year2023/day16.rs index d91b5d5..f83bdb5 100644 --- a/src/year2023/day16.rs +++ b/src/year2023/day16.rs @@ -13,10 +13,10 @@ //! up the search. use crate::util::grid::*; use crate::util::point::*; +use crate::util::thread::*; use std::collections::VecDeque; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -use std::thread; type Pair = (Point, u32); @@ -132,11 +132,7 @@ pub fn part2(input: &Input) -> usize { let mutex = Mutex::new(exclusive); // Use as many cores as possible to parallelize the search. - thread::scope(|scope| { - for _ in 0..thread::available_parallelism().unwrap().get() { - scope.spawn(|| worker(input, &shared, &mutex)); - } - }); + spawn(|| worker(input, &shared, &mutex)); shared.tiles.load(Ordering::Relaxed) }