Skip to content

Commit

Permalink
Thread utility methods
Browse files Browse the repository at this point in the history
  • Loading branch information
maneatingape committed Sep 7, 2024
1 parent f8a633b commit 420f718
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 110 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod util {
pub mod parse;
pub mod point;
pub mod slice;
pub mod thread;
}

/// # Help Santa by solving puzzles to fix the weather machine's snow function.
Expand Down
47 changes: 47 additions & 0 deletions src/util/thread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Utility methods to spawn a number of
//! [scoped](https://doc.rust-lang.org/stable/std/thread/fn.scope.html)
//! threads equals to the number of cores on the machine. Unlike normal threads, scoped threads
//! can borrow data from their environment.
use std::thread::*;

/// Spawn `n` scoped threads, where `n` is the available parallelism.
pub fn spawn<F, T>(f: F)
where
F: FnOnce() -> T + Copy + Send,
T: Send,
{
scope(|scope| {
for _ in 0..threads() {
scope.spawn(f);
}
});
}

/// Splits `items` into batches, one per thread. Items are assigned in a round robin fashion,
/// to achieve a crude load balacing in case some items are more complex to process than others.
pub fn spawn_batches<F, T, U>(mut items: Vec<U>, f: F)
where
F: FnOnce(Vec<U>) -> T + Copy + Send,
T: Send,
U: Send,
{
let threads = threads();
let mut batches: Vec<_> = (0..threads).map(|_| Vec::new()).collect();
let mut index = 0;

// Round robin items over each thread.
while let Some(next) = items.pop() {
batches[index % threads].push(next);
index += 1;
}

scope(|scope| {
for batch in batches {
scope.spawn(move || f(batch));
}
});
}

fn threads() -> usize {
available_parallelism().unwrap().get()
}
14 changes: 6 additions & 8 deletions src/year2015/day04.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
//! [`MD5`]: crate::util::md5
//! [`format!`]: std::format
use crate::util::md5::*;
use crate::util::thread::*;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::thread;

pub struct Shared {
prefix: String,
Expand All @@ -45,13 +45,11 @@ pub fn parse(input: &str) -> Shared {
}

// Use as many cores as possible to parallelize the remaining search.
thread::scope(|scope| {
for _ in 0..thread::available_parallelism().unwrap().get() {
#[cfg(not(feature = "simd"))]
scope.spawn(|| worker(&shared));
#[cfg(feature = "simd")]
scope.spawn(|| simd::worker(&shared));
}
spawn(|| {
#[cfg(not(feature = "simd"))]
worker(&shared);
#[cfg(feature = "simd")]
simd::worker(&shared);
});

shared
Expand Down
14 changes: 6 additions & 8 deletions src/year2016/day05.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
//!
//! [`Year 2015 Day 4`]: crate::year2015::day04
use crate::util::md5::*;
use crate::util::thread::*;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Mutex;
use std::thread;

struct Shared {
prefix: String,
Expand Down Expand Up @@ -35,13 +35,11 @@ pub fn parse(input: &str) -> Vec<u32> {
}

// Use as many cores as possible to parallelize the remaining search.
thread::scope(|scope| {
for _ in 0..thread::available_parallelism().unwrap().get() {
#[cfg(not(feature = "simd"))]
scope.spawn(|| worker(&shared, &mutex));
#[cfg(feature = "simd")]
scope.spawn(|| simd::worker(&shared, &mutex));
}
spawn(|| {
#[cfg(not(feature = "simd"))]
worker(&shared, &mutex);
#[cfg(feature = "simd")]
simd::worker(&shared, &mutex);
});

let mut found = mutex.into_inner().unwrap().found;
Expand Down
8 changes: 2 additions & 6 deletions src/year2016/day14.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
//! Brute force slog through all possible keys, parallelized as much as possible. An optimization
//! for part two is a quick method to convert `u32` to 8 ASCII digits.
use crate::util::md5::*;
use crate::util::thread::*;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::Mutex;
use std::thread;

/// Atomics can be safely shared between threads.
struct Shared<'a> {
Expand Down Expand Up @@ -44,11 +44,7 @@ fn generate_pad(input: &str, part_two: bool) -> i32 {
let mutex = Mutex::new(exclusive);

// Use as many cores as possible to parallelize the search.
thread::scope(|scope| {
for _ in 0..thread::available_parallelism().unwrap().get() {
scope.spawn(|| worker(&shared, &mutex, part_two));
}
});
spawn(|| worker(&shared, &mutex, part_two));

let exclusive = mutex.into_inner().unwrap();
*exclusive.found.iter().nth(63).unwrap()
Expand Down
8 changes: 2 additions & 6 deletions src/year2017/day14.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
//!
//! [`Day 10`]: crate::year2017::day10
//! [`Day 12`]: crate::year2017::day12
use crate::util::thread::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::thread;

/// Atomics can be safely shared between threads.
pub struct Shared {
Expand All @@ -27,11 +27,7 @@ pub fn parse(input: &str) -> Vec<u8> {
let mutex = Mutex::new(exclusive);

// Use as many cores as possible to parallelize the hashing.
thread::scope(|scope| {
for _ in 0..thread::available_parallelism().unwrap().get() {
scope.spawn(|| worker(&shared, &mutex));
}
});
spawn(|| worker(&shared, &mutex));

mutex.into_inner().unwrap().grid
}
Expand Down
51 changes: 21 additions & 30 deletions src/year2018/day11.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
//! This makes the total complexity `O(n³)`, however the calculation for each size is independent
//! so we can parallelize over multiple threads.
use crate::util::parse::*;
use crate::util::thread::*;
use std::sync::Mutex;
use std::thread;

pub struct Result {
x: usize,
Expand Down Expand Up @@ -38,36 +38,15 @@ pub fn parse(input: &str) -> Vec<Result> {
}

// Use as many cores as possible to parallelize the search.
let threads = thread::available_parallelism().unwrap().get();
// Smaller sizes take more time so keep batches roughly the same effort so that some
// threads are not finishing too soon and waiting idle, while others are still busy.
// For example if there are 4 cores, then they will be assigned sizes:
// * 1, 5, 9, ..
// * 2, 6, 10, ..
// * 3, 7, 11, ..
// * 4, 8, 12, ..
let mutex = Mutex::new(Vec::new());

thread::scope(|scope| {
for i in 0..threads {
// Shadow references in local variables so that they can be moved into closure.
let sat = &sat;
let mutex = &mutex;

// Smaller sizes take more time so keep batches roughly the same effort so that some
// threads are not finishing too soon and waiting idle, while others are still busy.
// For example if there are 4 cores, then they will be assigned sizes:
// * 1, 5, 9, ..
// * 2, 6, 10, ..
// * 3, 7, 11, ..
// * 4, 8, 12, ..
scope.spawn(move || {
let batch: Vec<_> = (1 + i..301)
.step_by(threads)
.map(|size| {
let (power, x, y) = square(sat, size);
Result { x, y, size, power }
})
.collect();

mutex.lock().unwrap().extend(batch);
});
}
});

spawn_batches((1..301).collect(), |batch| worker(batch, &sat, &mutex));
mutex.into_inner().unwrap()
}

Expand All @@ -81,6 +60,18 @@ pub fn part2(input: &[Result]) -> String {
format!("{x},{y},{size}")
}

fn worker(batch: Vec<usize>, sat: &[i32], mutex: &Mutex<Vec<Result>>) {
let result: Vec<_> = batch
.into_iter()
.map(|size| {
let (power, x, y) = square(sat, size);
Result { x, y, size, power }
})
.collect();

mutex.lock().unwrap().extend(result);
}

/// Find the (x,y) coordinates and max power for a square of the specified size.
fn square(sat: &[i32], size: usize) -> (i32, usize, usize) {
let mut max_power = i32::MIN;
Expand Down
8 changes: 2 additions & 6 deletions src/year2018/day15.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@
//! Choosing the first intersection in reading order the Elf correctly moves left.
use crate::util::grid::*;
use crate::util::point::*;
use crate::util::thread::*;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::thread;

const READING_ORDER: [Point; 4] = [UP, LEFT, RIGHT, DOWN];

Expand Down Expand Up @@ -149,11 +149,7 @@ pub fn part2(input: &Input) -> i32 {
let shared = Shared { done: AtomicBool::new(false), elf_attack_power: AtomicI32::new(4), tx };

// Use as many cores as possible to parallelize the search.
thread::scope(|scope| {
for _ in 0..thread::available_parallelism().unwrap().get() {
scope.spawn(|| worker(input, &shared));
}
});
spawn(|| worker(input, &shared));

// Hang up the channel.
drop(shared.tx);
Expand Down
8 changes: 2 additions & 6 deletions src/year2018/day24.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
//! [`Day 15`]: crate::year2018::day15
use crate::util::hash::*;
use crate::util::parse::*;
use crate::util::thread::*;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::thread;

pub struct Input {
immune: Vec<Group>,
Expand Down Expand Up @@ -99,11 +99,7 @@ pub fn part2(input: &Input) -> i32 {
let shared = Shared { done: AtomicBool::new(false), boost: AtomicI32::new(1), tx };

// Use as many cores as possible to parallelize the search.
thread::scope(|scope| {
for _ in 0..thread::available_parallelism().unwrap().get() {
scope.spawn(|| worker(input, &shared));
}
});
spawn(|| worker(input, &shared));

// Hang up the channel.
drop(shared.tx);
Expand Down
18 changes: 4 additions & 14 deletions src/year2021/day18.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
//! The root node is stored at index 0. For a node at index `i` its left child is at index
//! `2i + 1`, right child at index `2i + 2` and parent at index `i / 2`. As leaf nodes are
//! always greater than or equal to zero, `-1` is used as a special sentinel value for non-leaf nodes.
use crate::util::thread::*;
use std::sync::Mutex;
use std::thread;

type Snailfish = [i32; 63];

Expand Down Expand Up @@ -83,20 +83,10 @@ pub fn part2(input: &[Snailfish]) -> i32 {
}
}

// Break the work into roughly equally size batches.
let threads = thread::available_parallelism().unwrap().get();
let size = pairs.len().div_ceil(threads);
let batches: Vec<_> = pairs.chunks(size).collect();

// Use as many cores as possible to parallelize the calculation.
// Use as many cores as possible to parallelize the calculation,
// breaking the work into roughly equally size batches.
let mutex = Mutex::new(0);

thread::scope(|scope| {
for batch in batches {
scope.spawn(|| worker(batch, &mutex));
}
});

spawn_batches(pairs, |batch| worker(&batch, &mutex));
mutex.into_inner().unwrap()
}

Expand Down
8 changes: 2 additions & 6 deletions src/year2022/day11.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
//!
//! [`iter_unsigned`]: ParseOps::iter_unsigned
use crate::util::parse::*;
use crate::util::thread::*;
use std::sync::Mutex;
use std::thread;

pub struct Monkey {
items: Vec<u64>,
Expand Down Expand Up @@ -130,11 +130,7 @@ fn parallel(monkeys: &[Monkey], pairs: Vec<Pair>) -> Business {
let mutex = Mutex::new(exclusive);

// Use as many cores as possible to parallelize the calculation.
thread::scope(|scope| {
for _ in 0..thread::available_parallelism().unwrap().get() {
scope.spawn(|| worker(monkeys, &mutex));
}
});
spawn(|| worker(monkeys, &mutex));

mutex.into_inner().unwrap().business
}
Expand Down
18 changes: 4 additions & 14 deletions src/year2023/day12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@
//! This is equivalent to the prefix sum approach described above but a little clearer to
//! understand however slower to calculate.
use crate::util::parse::*;
use crate::util::thread::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;

type Spring<'a> = (&'a [u8], Vec<usize>);

Expand All @@ -141,20 +141,10 @@ pub fn part1(input: &[Spring<'_>]) -> u64 {
}

pub fn part2(input: &[Spring<'_>]) -> u64 {
// Break the work into roughly equally size batches.
let threads = thread::available_parallelism().unwrap().get();
let size = input.len().div_ceil(threads);
let batches: Vec<_> = input.chunks(size).collect();

// Use as many cores as possible to parallelize the calculation.
// Use as many cores as possible to parallelize the calculation,
// breaking the work into roughly equally size batches.
let shared = AtomicU64::new(0);

thread::scope(|scope| {
for batch in batches {
scope.spawn(|| shared.fetch_add(solve(batch, 5), Ordering::Relaxed));
}
});

spawn_batches(input.to_vec(), |batch| shared.fetch_add(solve(&batch, 5), Ordering::Relaxed));
shared.load(Ordering::Relaxed)
}

Expand Down
Loading

0 comments on commit 420f718

Please sign in to comment.