Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watched_tasks: improve error handling in spawned tasks and threads #48

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 15 additions & 42 deletions src/adc/iio/hardware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::path::Path;
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context, Error, Result};
use anyhow::{anyhow, Context, Result};
use async_std::channel::bounded;
use async_std::sync::Arc;

Expand Down Expand Up @@ -275,40 +275,27 @@ impl IioThread {
// setup was sucessful.
// This is why we create Self inside the thread and send it back
// to the calling thread via a queue.
let (thread_res_tx, thread_res_rx) = bounded(1);
let (thread_tx, thread_rx) = bounded(1);

// Spawn a high priority thread that updates the atomic values in `thread`.
wtb.spawn_thread(thread_name, move || {
let adc_setup_res = Self::adc_setup(
let (channels, mut buf) = Self::adc_setup(
adc_name,
trigger_name,
sample_rate,
channel_descs,
buffer_len,
);
let (thread, channels, mut buf) = match adc_setup_res {
Ok((channels, buf)) => {
let thread = Arc::new(Self {
ref_instant: Instant::now(),
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
channel_descs,
});

(thread, channels, buf)
}
Err(e) => {
// Can not fail in practice as the queue is known to be empty
// at this point.
thread_res_tx
.try_send(Err(e))
.expect("Failed to signal ADC setup error due to full queue");
return Ok(());
}
};
)?;

let thread = Arc::new(Self {
ref_instant: Instant::now(),
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
channel_descs,
});

let thread_weak = Arc::downgrade(&thread);
let mut signal_ready = Some((thread, thread_res_tx));
let mut signal_ready = Some((thread, thread_tx));

// Stop running as soon as the last reference to this Arc<IioThread>
// is dropped (e.g. the weak reference can no longer be upgraded).
Expand All @@ -318,18 +305,7 @@ impl IioThread {

error!("Failed to refill {} ADC buffer: {}", adc_name, e);

// If the ADC has not yet produced any values we still have the
// queue at hand that signals readiness to the main thread.
// This gives us a chance to return an Err from new().
// If the queue was already used just print an error instead.
if let Some((_, tx)) = signal_ready.take() {
// Can not fail in practice as the queue is only .take()n
// once and thus known to be empty.
tx.try_send(Err(Error::new(e)))
.expect("Failed to signal ADC setup error due to full queue");
}

break;
Err(e)?;
}

let values = channels.iter().map(|ch| {
Expand All @@ -356,17 +332,14 @@ impl IioThread {
if let Some((content, tx)) = signal_ready.take() {
// Can not fail in practice as the queue is only .take()n
// once and thus known to be empty.
tx.try_send(Ok(content))
.expect("Failed to signal ADC setup completion due to full queue");
tx.try_send(content)?;
}
}

Ok(())
})?;

let thread = thread_res_rx.recv().await??;

Ok(thread)
Ok(thread_rx.recv().await?)
}

pub async fn new_stm32(
Expand Down
2 changes: 1 addition & 1 deletion src/digital_io/gpio/demo_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct LineHandle {
}

impl LineHandle {
pub fn set_value(&self, val: u8) -> Result<(), ()> {
pub fn set_value(&self, val: u8) -> Result<()> {
// This does not actually set up any IIO things.
// It is just a hack to let adc/iio/demo_mode.rs
// communicate with this function so that toggling an output
Expand Down
2 changes: 1 addition & 1 deletion src/digital_io/gpio/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct LineHandle {
}

impl LineHandle {
pub fn set_value(&self, val: u8) -> Result<(), ()> {
pub fn set_value(&self, val: u8) -> Result<()> {
println!("GPIO simulation set {} to {}", self.name, val);
self.val.store(val, Ordering::Relaxed);
Ok(())
Expand Down
69 changes: 33 additions & 36 deletions src/dut_power.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ mod prio {
use thread_priority::*;

pub fn realtime_priority() -> Result<()> {
let prio = ThreadPriorityValue::try_from(10)
.map_err(|e| anyhow!("Failed to choose realtime priority level 10: {e:?}"))?;

set_thread_priority_and_policy(
thread_native_id(),
ThreadPriority::Crossplatform(ThreadPriorityValue::try_from(10).unwrap()),
ThreadPriority::Crossplatform(prio),
ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo),
)
.map_err(|e| anyhow!("Failed to set up realtime priority {e:?}"))
Expand Down Expand Up @@ -260,10 +263,12 @@ fn turn_off_with_reason(
pwr_line: &LineHandle,
discharge_line: &LineHandle,
fail_state: &AtomicU8,
) {
pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(DISCHARGE_LINE_ASSERTED).unwrap();
) -> Result<()> {
pwr_line.set_value(1 - PWR_LINE_ASSERTED)?;
discharge_line.set_value(DISCHARGE_LINE_ASSERTED)?;
fail_state.store(reason as u8, Ordering::Relaxed);

Ok(())
}

/// Labgrid has a fixed assumption of how a REST based power port should work.
Expand Down Expand Up @@ -333,7 +338,7 @@ impl DutPwrThread {
// as well.
// Use a queue to notify the calling thread if the priority setup
// succeeded.
let (thread_res_tx, mut thread_res_rx) = bounded(1);
let (thread_tx, thread_rx) = bounded(1);

// Spawn a high priority thread that handles the power status
// in a realtimey fashion.
Expand All @@ -348,24 +353,20 @@ impl DutPwrThread {
let mut volt_filter = MedianFilter::<4>::new();
let mut curr_filter = MedianFilter::<4>::new();

let (tick_weak, request, state) = match realtime_priority() {
Ok(_) => {
let tick = Arc::new(AtomicU32::new(0));
let tick_weak = Arc::downgrade(&tick);
realtime_priority()?;

let request = Arc::new(AtomicU8::new(OutputRequest::Idle as u8));
let state = Arc::new(AtomicU8::new(OutputState::Off as u8));
let (tick_weak, request, state) = {
let tick = Arc::new(AtomicU32::new(0));
let tick_weak = Arc::downgrade(&tick);

thread_res_tx
.try_send(Ok((tick, request.clone(), state.clone())))
.unwrap();
let request = Arc::new(AtomicU8::new(OutputRequest::Idle as u8));
let state = Arc::new(AtomicU8::new(OutputState::Off as u8));

(tick_weak, request, state)
}
Err(e) => {
thread_res_tx.try_send(Err(e)).unwrap();
panic!()
}
thread_tx
.try_send((tick, request.clone(), state.clone()))
.expect("Queue that should be empty wasn't");

(tick_weak, request, state)
};

// The grace period contains the number of loop iterations until
Expand Down Expand Up @@ -406,7 +407,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;
} else {
// We have a fresh ADC value. Signal "everything is well"
// to the watchdog task.
Expand Down Expand Up @@ -463,7 +464,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;

continue;
}
Expand All @@ -474,7 +475,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;

continue;
}
Expand All @@ -485,7 +486,7 @@ impl DutPwrThread {
&pwr_line,
&discharge_line,
&state,
);
)?;

continue;
}
Expand All @@ -496,34 +497,30 @@ impl DutPwrThread {
match req {
OutputRequest::Idle => {}
OutputRequest::On => {
discharge_line
.set_value(1 - DISCHARGE_LINE_ASSERTED)
.unwrap();
pwr_line.set_value(PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(1 - DISCHARGE_LINE_ASSERTED)?;
pwr_line.set_value(PWR_LINE_ASSERTED)?;
state.store(OutputState::On as u8, Ordering::Relaxed);
}
OutputRequest::Off => {
discharge_line.set_value(DISCHARGE_LINE_ASSERTED).unwrap();
pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(DISCHARGE_LINE_ASSERTED)?;
pwr_line.set_value(1 - PWR_LINE_ASSERTED)?;
state.store(OutputState::Off as u8, Ordering::Relaxed);
}
OutputRequest::OffFloating => {
discharge_line
.set_value(1 - DISCHARGE_LINE_ASSERTED)
.unwrap();
pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap();
discharge_line.set_value(1 - DISCHARGE_LINE_ASSERTED)?;
pwr_line.set_value(1 - PWR_LINE_ASSERTED)?;
state.store(OutputState::OffFloating as u8, Ordering::Relaxed);
}
}
}

// Make sure to enter fail safe mode before leaving the thread
turn_off_with_reason(OutputState::Off, &pwr_line, &discharge_line, &state);
turn_off_with_reason(OutputState::Off, &pwr_line, &discharge_line, &state)?;

Ok(())
})?;

let (tick, request, state) = thread_res_rx.next().await.unwrap()?;
let (tick, request, state) = thread_rx.recv().await?;

// The request and state topic use the same external path, this way one
// can e.g. publish "On" to the topic and be sure that the output is
Expand Down
Loading