diff --git a/src/adc/iio/hardware.rs b/src/adc/iio/hardware.rs index d903370..fcbfcbd 100644 --- a/src/adc/iio/hardware.rs +++ b/src/adc/iio/hardware.rs @@ -22,7 +22,7 @@ use std::path::Path; use std::sync::atomic::{AtomicU16, AtomicU64, Ordering}; use std::time::{Duration, Instant}; -use anyhow::{anyhow, Context, Error, Result}; +use anyhow::{anyhow, Context, Result}; use async_std::channel::bounded; use async_std::sync::Arc; @@ -275,40 +275,27 @@ impl IioThread { // setup was sucessful. // This is why we create Self inside the thread and send it back // to the calling thread via a queue. - let (thread_res_tx, thread_res_rx) = bounded(1); + let (thread_tx, thread_rx) = bounded(1); // Spawn a high priority thread that updates the atomic values in `thread`. wtb.spawn_thread(thread_name, move || { - let adc_setup_res = Self::adc_setup( + let (channels, mut buf) = Self::adc_setup( adc_name, trigger_name, sample_rate, channel_descs, buffer_len, - ); - let (thread, channels, mut buf) = match adc_setup_res { - Ok((channels, buf)) => { - let thread = Arc::new(Self { - ref_instant: Instant::now(), - timestamp: AtomicU64::new(TIMESTAMP_ERROR), - values: channels.iter().map(|_| AtomicU16::new(0)).collect(), - channel_descs, - }); - - (thread, channels, buf) - } - Err(e) => { - // Can not fail in practice as the queue is known to be empty - // at this point. - thread_res_tx - .try_send(Err(e)) - .expect("Failed to signal ADC setup error due to full queue"); - return Ok(()); - } - }; + )?; + + let thread = Arc::new(Self { + ref_instant: Instant::now(), + timestamp: AtomicU64::new(TIMESTAMP_ERROR), + values: channels.iter().map(|_| AtomicU16::new(0)).collect(), + channel_descs, + }); let thread_weak = Arc::downgrade(&thread); - let mut signal_ready = Some((thread, thread_res_tx)); + let mut signal_ready = Some((thread, thread_tx)); // Stop running as soon as the last reference to this Arc // is dropped (e.g. the weak reference can no longer be upgraded). @@ -318,18 +305,7 @@ impl IioThread { error!("Failed to refill {} ADC buffer: {}", adc_name, e); - // If the ADC has not yet produced any values we still have the - // queue at hand that signals readiness to the main thread. - // This gives us a chance to return an Err from new(). - // If the queue was already used just print an error instead. - if let Some((_, tx)) = signal_ready.take() { - // Can not fail in practice as the queue is only .take()n - // once and thus known to be empty. - tx.try_send(Err(Error::new(e))) - .expect("Failed to signal ADC setup error due to full queue"); - } - - break; + Err(e)?; } let values = channels.iter().map(|ch| { @@ -356,17 +332,14 @@ impl IioThread { if let Some((content, tx)) = signal_ready.take() { // Can not fail in practice as the queue is only .take()n // once and thus known to be empty. - tx.try_send(Ok(content)) - .expect("Failed to signal ADC setup completion due to full queue"); + tx.try_send(content)?; } } Ok(()) })?; - let thread = thread_res_rx.recv().await??; - - Ok(thread) + Ok(thread_rx.recv().await?) } pub async fn new_stm32( diff --git a/src/digital_io/gpio/demo_mode.rs b/src/digital_io/gpio/demo_mode.rs index b9889eb..dc54c37 100644 --- a/src/digital_io/gpio/demo_mode.rs +++ b/src/digital_io/gpio/demo_mode.rs @@ -27,7 +27,7 @@ pub struct LineHandle { } impl LineHandle { - pub fn set_value(&self, val: u8) -> Result<(), ()> { + pub fn set_value(&self, val: u8) -> Result<()> { // This does not actually set up any IIO things. // It is just a hack to let adc/iio/demo_mode.rs // communicate with this function so that toggling an output diff --git a/src/digital_io/gpio/test.rs b/src/digital_io/gpio/test.rs index db5b147..8b25b59 100644 --- a/src/digital_io/gpio/test.rs +++ b/src/digital_io/gpio/test.rs @@ -32,7 +32,7 @@ pub struct LineHandle { } impl LineHandle { - pub fn set_value(&self, val: u8) -> Result<(), ()> { + pub fn set_value(&self, val: u8) -> Result<()> { println!("GPIO simulation set {} to {}", self.name, val); self.val.store(val, Ordering::Relaxed); Ok(()) diff --git a/src/dut_power.rs b/src/dut_power.rs index a95623f..c37d983 100644 --- a/src/dut_power.rs +++ b/src/dut_power.rs @@ -50,9 +50,12 @@ mod prio { use thread_priority::*; pub fn realtime_priority() -> Result<()> { + let prio = ThreadPriorityValue::try_from(10) + .map_err(|e| anyhow!("Failed to choose realtime priority level 10: {e:?}"))?; + set_thread_priority_and_policy( thread_native_id(), - ThreadPriority::Crossplatform(ThreadPriorityValue::try_from(10).unwrap()), + ThreadPriority::Crossplatform(prio), ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo), ) .map_err(|e| anyhow!("Failed to set up realtime priority {e:?}")) @@ -260,10 +263,12 @@ fn turn_off_with_reason( pwr_line: &LineHandle, discharge_line: &LineHandle, fail_state: &AtomicU8, -) { - pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap(); - discharge_line.set_value(DISCHARGE_LINE_ASSERTED).unwrap(); +) -> Result<()> { + pwr_line.set_value(1 - PWR_LINE_ASSERTED)?; + discharge_line.set_value(DISCHARGE_LINE_ASSERTED)?; fail_state.store(reason as u8, Ordering::Relaxed); + + Ok(()) } /// Labgrid has a fixed assumption of how a REST based power port should work. @@ -333,7 +338,7 @@ impl DutPwrThread { // as well. // Use a queue to notify the calling thread if the priority setup // succeeded. - let (thread_res_tx, mut thread_res_rx) = bounded(1); + let (thread_tx, thread_rx) = bounded(1); // Spawn a high priority thread that handles the power status // in a realtimey fashion. @@ -348,24 +353,20 @@ impl DutPwrThread { let mut volt_filter = MedianFilter::<4>::new(); let mut curr_filter = MedianFilter::<4>::new(); - let (tick_weak, request, state) = match realtime_priority() { - Ok(_) => { - let tick = Arc::new(AtomicU32::new(0)); - let tick_weak = Arc::downgrade(&tick); + realtime_priority()?; - let request = Arc::new(AtomicU8::new(OutputRequest::Idle as u8)); - let state = Arc::new(AtomicU8::new(OutputState::Off as u8)); + let (tick_weak, request, state) = { + let tick = Arc::new(AtomicU32::new(0)); + let tick_weak = Arc::downgrade(&tick); - thread_res_tx - .try_send(Ok((tick, request.clone(), state.clone()))) - .unwrap(); + let request = Arc::new(AtomicU8::new(OutputRequest::Idle as u8)); + let state = Arc::new(AtomicU8::new(OutputState::Off as u8)); - (tick_weak, request, state) - } - Err(e) => { - thread_res_tx.try_send(Err(e)).unwrap(); - panic!() - } + thread_tx + .try_send((tick, request.clone(), state.clone())) + .expect("Queue that should be empty wasn't"); + + (tick_weak, request, state) }; // The grace period contains the number of loop iterations until @@ -406,7 +407,7 @@ impl DutPwrThread { &pwr_line, &discharge_line, &state, - ); + )?; } else { // We have a fresh ADC value. Signal "everything is well" // to the watchdog task. @@ -463,7 +464,7 @@ impl DutPwrThread { &pwr_line, &discharge_line, &state, - ); + )?; continue; } @@ -474,7 +475,7 @@ impl DutPwrThread { &pwr_line, &discharge_line, &state, - ); + )?; continue; } @@ -485,7 +486,7 @@ impl DutPwrThread { &pwr_line, &discharge_line, &state, - ); + )?; continue; } @@ -496,34 +497,30 @@ impl DutPwrThread { match req { OutputRequest::Idle => {} OutputRequest::On => { - discharge_line - .set_value(1 - DISCHARGE_LINE_ASSERTED) - .unwrap(); - pwr_line.set_value(PWR_LINE_ASSERTED).unwrap(); + discharge_line.set_value(1 - DISCHARGE_LINE_ASSERTED)?; + pwr_line.set_value(PWR_LINE_ASSERTED)?; state.store(OutputState::On as u8, Ordering::Relaxed); } OutputRequest::Off => { - discharge_line.set_value(DISCHARGE_LINE_ASSERTED).unwrap(); - pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap(); + discharge_line.set_value(DISCHARGE_LINE_ASSERTED)?; + pwr_line.set_value(1 - PWR_LINE_ASSERTED)?; state.store(OutputState::Off as u8, Ordering::Relaxed); } OutputRequest::OffFloating => { - discharge_line - .set_value(1 - DISCHARGE_LINE_ASSERTED) - .unwrap(); - pwr_line.set_value(1 - PWR_LINE_ASSERTED).unwrap(); + discharge_line.set_value(1 - DISCHARGE_LINE_ASSERTED)?; + pwr_line.set_value(1 - PWR_LINE_ASSERTED)?; state.store(OutputState::OffFloating as u8, Ordering::Relaxed); } } } // Make sure to enter fail safe mode before leaving the thread - turn_off_with_reason(OutputState::Off, &pwr_line, &discharge_line, &state); + turn_off_with_reason(OutputState::Off, &pwr_line, &discharge_line, &state)?; Ok(()) })?; - let (tick, request, state) = thread_res_rx.next().await.unwrap()?; + let (tick, request, state) = thread_rx.recv().await?; // The request and state topic use the same external path, this way one // can e.g. publish "On" to the topic and be sure that the output is