Skip to content
This repository has been archived by the owner on Jun 19, 2024. It is now read-only.

Commit

Permalink
refactor: change task module structure
Browse files Browse the repository at this point in the history
fix: remove uneccesarry mutable static
  • Loading branch information
Gavin-Niederman committed Dec 9, 2023
1 parent 72348f1 commit 5698cdf
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 280 deletions.
2 changes: 1 addition & 1 deletion pros/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ impl AsyncRobot for Robot {
Ok(())
}
}
async_robot!(Robot);
async_robot!(Robot);
4 changes: 2 additions & 2 deletions pros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ macro_rules! sync_robot {
}

pub mod prelude {
pub use crate::{ async_robot, sync_robot };
pub use crate::{ AsyncRobot, SyncRobot };
pub use crate::{async_robot, sync_robot};
pub use crate::{AsyncRobot, SyncRobot};

// Import Box from alloc so that it can be used in async_trait!
pub use crate::{async_trait, os_task_local, print, println};
Expand Down
57 changes: 39 additions & 18 deletions pros/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use spin::Once;

use super::current;

static mut INDEX: AtomicU32 = AtomicU32::new(0);
static INDEX: AtomicU32 = AtomicU32::new(0);

// Unsafe because you can change the thread local storage while it is being read.
// This requires you to leak val so that you can be sure it lives the entire task.
Expand All @@ -21,6 +21,23 @@ unsafe fn task_local_storage_get<T>(task: pros_sys::task_t, index: u32) -> Optio
val.cast::<T>().as_ref()
}

fn fetch_storage() -> &'static RefCell<ThreadLocalStorage> {
let current = current();

// Get the thread local storage for this task.
// Creating it if it doesn't exist.
// This is safe as long as index 0 of the freeRTOS TLS is never set to any other type.
unsafe {
task_local_storage_get(current.task, 0).unwrap_or_else(|| {
let storage = Box::leak(Box::new(RefCell::new(ThreadLocalStorage {
data: BTreeMap::new(),
})));
task_local_storage_set(current.task, storage, 0);
storage
})
}
}

struct ThreadLocalStorage {
pub data: BTreeMap<usize, NonNull<()>>,
}
Expand All @@ -38,35 +55,39 @@ impl<T: 'static> LocalKey<T> {
}
}

fn index(&'static self) -> &usize {
self.index.call_once(|| INDEX.fetch_add(1, core::sync::atomic::Ordering::Relaxed) as _)
}

pub fn set(&'static self, val: T) {
let storage = fetch_storage();
let index = *self.index();

let val = Box::leak(Box::new(val));

storage.borrow_mut().data.insert(index, NonNull::new((val as *mut T).cast()).unwrap());
}

pub fn with<F, R>(&'static self, f: F) -> R
where
F: FnOnce(&'static T) -> R,
{
let index = *self.index.call_once(|| unsafe {
INDEX.fetch_add(1, core::sync::atomic::Ordering::SeqCst) as usize
});
let current = current();

// Get the thread local storage for this task.
// Creating it if it doesn't exist.
let storage = unsafe {
task_local_storage_get(current.task, 0).unwrap_or_else(|| {
let storage = Box::leak(Box::new(RefCell::new(ThreadLocalStorage {
data: BTreeMap::new(),
})));
task_local_storage_set(current.task, storage, 0);
storage
})
};
let storage = fetch_storage();
let index = *self.index();

// Make sure that the borrow is dropped if the if does not execute.
// This shouldn't be necessary, but caution is good.
{
if let Some(val) = storage.borrow_mut().data.get(&index) {
return f(unsafe { val.cast().as_ref() });
}
}

let val = Box::leak(Box::new((self.init)()));
storage.borrow_mut().data.insert(index, NonNull::new((val as *mut T).cast::<()>()).unwrap());
storage
.borrow_mut()
.data
.insert(index, NonNull::new((val as *mut T).cast::<()>()).unwrap());
f(val)
}
}
Expand Down
257 changes: 254 additions & 3 deletions pros/src/task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,262 @@
pub mod local;
pub mod task;

pub use task::*;

use core::hash::Hash;
use core::{future::Future, task::Poll};

use crate::async_runtime::executor::EXECUTOR;
use crate::error::{bail_on, map_errno};

use snafu::Snafu;


/// Creates a task to be run 'asynchronously' (More information at the [FreeRTOS docs](https://www.freertos.org/taskandcr.html)).
/// Takes in a closure that can move variables if needed.
/// If your task has a loop it is advised to use [`sleep(duration)`](sleep) so that the task does not take up necessary system resources.
/// Tasks should be long-living; starting many tasks can be slow and is usually not necessary.
pub fn spawn<F>(f: F) -> TaskHandle
where
F: FnOnce() + Send + 'static,
{
Builder::new().spawn(f).expect("Failed to spawn task")
}

fn spawn_inner<F: FnOnce() + Send + 'static>(
function: F,
priority: TaskPriority,
stack_depth: TaskStackDepth,
name: Option<&str>,
) -> Result<TaskHandle, SpawnError> {
let mut entrypoint = TaskEntrypoint { function };
let name = alloc::ffi::CString::new(name.unwrap_or("<unnamed>"))
.unwrap()
.into_raw();
unsafe {
let task = bail_on!(
core::ptr::null(),
pros_sys::task_create(
Some(TaskEntrypoint::<F>::cast_and_call_external),
&mut entrypoint as *mut _ as *mut core::ffi::c_void,
priority as _,
stack_depth as _,
name,
)
);

_ = alloc::ffi::CString::from_raw(name);

Ok(TaskHandle { task })
}
}

/// An owned permission to perform actions on a task.
#[derive(Clone)]
pub struct TaskHandle {
pub(crate) task: pros_sys::task_t,
}
unsafe impl Send for TaskHandle {}
impl Hash for TaskHandle {
fn hash<H: core::hash::Hasher>(&self, state: &mut H) {
self.task.hash(state)
}
}

impl PartialEq for TaskHandle {
fn eq(&self, other: &Self) -> bool {
self.task == other.task
}
}
impl Eq for TaskHandle {}

impl TaskHandle {
/// Pause execution of the task.
/// This can have unintended consequences if you are not careful,
/// for example, if this task is holding a mutex when paused, there is no way to retrieve it until the task is unpaused.
pub fn pause(&self) {
unsafe {
pros_sys::task_suspend(self.task);
}
}

/// Resumes execution of the task.
pub fn unpause(&self) {
unsafe {
pros_sys::task_resume(self.task);
}
}

/// Sets the task's priority, allowing you to control how much cpu time is allocated to it.
pub fn set_priority(&self, priority: impl Into<u32>) {
unsafe {
pros_sys::task_set_priority(self.task, priority.into());
}
}

/// Get the state of the task.
pub fn state(&self) -> TaskState {
unsafe { pros_sys::task_get_state(self.task).into() }
}

/// Send a notification to the task.
pub fn notify(&self) {
unsafe {
pros_sys::task_notify(self.task);
}
}

/// Waits for the task to finish, and then deletes it.
pub fn join(self) {
unsafe {
pros_sys::task_join(self.task);
}
}

/// Aborts the task and consumes it. Memory allocated by the task will not be freed.
pub fn abort(self) {
unsafe {
pros_sys::task_delete(self.task);
}
}
}

/// An ergonomic builder for tasks. Alternatively you can use [`spawn`].
#[derive(Default)]
pub struct Builder<'a> {
name: Option<&'a str>,
priority: Option<TaskPriority>,
stack_depth: Option<TaskStackDepth>,
}

impl<'a> Builder<'a> {
/// Creates a task builder.
pub fn new() -> Self {
Self::default()
}

/// Sets the name of the task, this is useful for debugging.
pub fn name(mut self, name: &'a str) -> Self {
self.name = Some(name);
self
}

/// Sets the priority of the task (how much time the scheduler gives to it.).
pub fn priority(mut self, priority: TaskPriority) -> Self {
self.priority = Some(priority);
self
}

/// Sets how large the stack for the task is.
/// This can usually be set to default
pub fn stack_depth(mut self, stack_depth: TaskStackDepth) -> Self {
self.stack_depth = Some(stack_depth);
self
}

/// Builds and spawns the task
pub fn spawn<F>(self, function: F) -> Result<TaskHandle, SpawnError>
where
F: FnOnce() + Send + 'static,
{
spawn_inner(
function,
self.priority.unwrap_or_default(),
self.stack_depth.unwrap_or_default(),
self.name,
)
}
}

/// Represents the current state of a task.
pub enum TaskState {
/// The task is currently utilizing the processor
Running,
/// The task is currently yielding but may run in the future
Ready,
/// The task is blocked. For example, it may be [`sleep`]ing or waiting on a mutex.
/// Tasks that are in this state will usually return to the task queue after a set timeout.
Blocked,
/// The task is suspended. For example, it may be waiting on a mutex or semaphore.
Suspended,
/// The task has been deleted using [`TaskHandle::abort`].
Deleted,
/// The task's state is invalid somehow
Invalid,
}

impl From<u32> for TaskState {
fn from(value: u32) -> Self {
match value {
pros_sys::E_TASK_STATE_RUNNING => Self::Running,
pros_sys::E_TASK_STATE_READY => Self::Ready,
pros_sys::E_TASK_STATE_BLOCKED => Self::Blocked,
pros_sys::E_TASK_STATE_SUSPENDED => Self::Suspended,
pros_sys::E_TASK_STATE_DELETED => Self::Deleted,
pros_sys::E_TASK_STATE_INVALID => Self::Invalid,
_ => Self::Invalid,
}
}
}

/// Represents how much time the cpu should spend on this task.
/// (Otherwise known as the priority)
#[repr(u32)]
pub enum TaskPriority {
High = 16,
Default = 8,
Low = 1,
}

impl Default for TaskPriority {
fn default() -> Self {
Self::Default
}
}

impl From<TaskPriority> for u32 {
fn from(val: TaskPriority) -> Self {
val as u32
}
}

/// Represents how large of a stack the task should get.
/// Tasks that don't have any or many variables and/or don't need floats can use the low stack depth option.
#[repr(u32)]
pub enum TaskStackDepth {
Default = 8192,
Low = 512,
}

impl Default for TaskStackDepth {
fn default() -> Self {
Self::Default
}
}

struct TaskEntrypoint<F> {
function: F,
}

impl<F> TaskEntrypoint<F>
where
F: FnOnce(),
{
unsafe extern "C" fn cast_and_call_external(this: *mut core::ffi::c_void) {
let this = this.cast::<Self>().read();

(this.function)()
}
}

#[derive(Debug, Snafu)]
pub enum SpawnError {
#[snafu(display("The stack cannot be used as the TCB was not created."))]
TCBNotCreated,
}

map_errno! {
SpawnError {
ENOMEM => SpawnError::TCBNotCreated,
}
}

/// Blocks the current task for the given amount of time, if you are in an async function.
/// ## you probably don't want to use this.
Expand Down
Loading

0 comments on commit 5698cdf

Please sign in to comment.