Skip to content

Commit

Permalink
rewrite db using sqlx. the one I wanted to avoid but it works async...
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier committed Feb 26, 2024
1 parent cadff0e commit 94ae112
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2021"

[dependencies]
anyhow = "1.0.80"
sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-rustls", "sqlite"] }
rppal = "0.17.1"
rusqlite = "0.31.0"
tokio = { version = "1", features = ["rt-multi-thread", "net", "time", "macros", "signal"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
Expand Down
3 changes: 3 additions & 0 deletions env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export DATABASE_URL="sqlite:./db.sqlite"
#export RUST_LOG="error,robot=debug,store=debug"
export RUST_LOG="debug"
11 changes: 11 additions & 0 deletions recreate_db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash
set -e
set -v
DB="./db.sqlite"
if test -f "$DB";
then
rm $DB;
fi

sqlite3 $DB "CREATE TABLE IF NOT EXISTS wind (ts REAL PRIMARY KEY, vel REAL, direction INTEGER)"

8 changes: 4 additions & 4 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use warp::Filter;
use crate::davis::Davis;

async fn state_query(sensor: Arc<Davis>) -> Result<impl warp::Reply, warp::Rejection> {
let speed = sensor.get_current_wind();
Ok(warp::reply::html(format!("Current speed is {}", speed)))
let speed = sensor.last_data().await;
Ok(warp::reply::html(format!("Current speed is {:?}", speed)))
}

pub async fn run_server(
Expand All @@ -21,8 +21,8 @@ pub async fn run_server(
let hello = warp::path!("hello" / String).map(|name| format!("Hello, {}!", name));

let live = warp::get()
.and(warp::path("v1"))
.and(warp::path("current_wind"))
.and(warp::path("wind"))
.and(warp::path("last_data"))
.and(warp::path::end())
.and(with_context.clone())
.and_then(state_query);
Expand Down
2 changes: 1 addition & 1 deletion src/bin/wind_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::signal;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let davis = Arc::new(Davis::connect().await?);
let davis = Arc::new(Davis::connect(String::from("./db.sqlite")).await?);
let http_server = WindServer::run(davis.clone(), "0.0.0.0:8080".parse()?).await;

match signal::ctrl_c().await {
Expand Down
18 changes: 12 additions & 6 deletions src/davis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,37 @@ use anyhow::Result;
use rppal::gpio::{Gpio, Trigger};
use tokio::time::sleep;

use crate::db::{Measurement, DB};

#[derive(Debug)]
pub struct Davis {
counting_handle: std::thread::JoinHandle<Result<()>>,
update_handle: tokio::task::JoinHandle<Result<()>>,
period: f64,
db: Arc<DB>,
}

impl Davis {
pub async fn connect() -> Result<Self> {
pub async fn connect(db_path: String) -> Result<Self> {
dbg!("start connect to Davis sensor");
let db = Arc::new(DB::connect(db_path).await?);
let period = 5.0;
let counter = Arc::new(AtomicU64::new(0));
let counter_ptr = counter.clone();
let counting_handle = std::thread::spawn(|| counting_sync_loop(counter_ptr));
let db_clone = db.clone();
let update_handle =
tokio::task::spawn(async move { fetch_data_loop(period, counter).await });
tokio::task::spawn(async move { fetch_data_loop(period, counter, db_clone).await });
dbg!("end connect");
Ok(Davis {
counting_handle,
update_handle,
period,
db,
})
}
pub fn get_current_wind(&self) -> f64 {
1.23
pub async fn last_data(&self) -> Result<Measurement> {
self.db.last_data().await
}
}

Expand All @@ -58,7 +64,7 @@ pub fn counting_sync_loop(counter: Arc<AtomicU64>) -> Result<()> {
}
}

pub async fn fetch_data_loop(period: f64, counter: Arc<AtomicU64>) -> Result<()> {
pub async fn fetch_data_loop(period: f64, counter: Arc<AtomicU64>, db: Arc<DB>) -> Result<()> {
let mut last_call = Instant::now();
loop {
let now = Instant::now();
Expand All @@ -70,7 +76,7 @@ pub async fn fetch_data_loop(period: f64, counter: Arc<AtomicU64>) -> Result<()>
dbg!(count, elapsed, counter.load(Ordering::SeqCst));
let vel = wind_speed_mph * 0.44704;
dbg!(&vel);

db.insert_measurement(vel, 0).await?;
sleep(Duration::from_secs_f64(period)).await;
}
}
132 changes: 87 additions & 45 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,85 +1,127 @@
use std::{
path::Path,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::Result;
use rusqlite::Connection;
use sqlx::migrate::MigrateDatabase;
use sqlx::{Sqlite, SqlitePool};

#[derive(Debug)]
struct DB {
conn: Connection,
pub struct DB {
pool: SqlitePool,
}

#[derive(Debug)]
struct Measurement {
pub struct Measurement {
ts: SystemTime,
vel: f64,
direction: u16,
}

impl DB {
pub fn connect(path: &Path) -> Result<Self> {
tracing::warn!("Opening sqlite DB at {:?}", &path);
let conn = Connection::open(path)?;
pub async fn connect(path: String) -> Result<Self> {
if !Sqlite::database_exists(&path).await.unwrap_or(false) {
println!("Creating database {}", &path);
match Sqlite::create_database(&path).await {
Ok(_) => println!("Create db success"),
Err(error) => panic!("error: {}", error),
}
} else {
println!("Database already exists");
}

let pool = sqlx::SqlitePool::connect(path.as_str()).await?;
tracing::warn!("DB opened");
Ok(Self { conn })
let db = Self { pool };
db.create_tables(false).await?;
Ok(db)
}
pub fn create_tables(&self, force_delete: bool) -> Result<()> {

pub async fn create_tables(&self, force_delete: bool) -> Result<()> {
if force_delete {
tracing::warn!("Force deleting tables");
self.conn.execute("DROP TABLE IF EXISTS wind", ())?;
sqlx::query!("DROP TABLE IF EXISTS wind")
.execute(&self.pool)
.await?;
}
tracing::warn!("Creating tables");
self.conn.execute(
sqlx::query!(
"CREATE TABLE IF NOT EXISTS wind (
ts REAL PRIMARY KEY,
vel REAL,
direction INTEGER
)",
(), // empty list of parameters.
)?;
)"
)
.execute(&self.pool)
.await?;

tracing::warn!("Tables created");
Ok(())
}

pub fn insert_measurement(&self, vel: f64, direction: u16) -> Result<()> {
pub async fn insert_measurement(&self, vel: f64, direction: u16) -> Result<()> {
let ts = SystemTime::now().duration_since(UNIX_EPOCH)?;
self.insert_measurement_at_t(ts, vel, direction)
self.insert_measurement_at_t(ts, vel, direction).await
}

pub fn insert_measurement_at_t(
pub async fn insert_measurement_at_t(
&self,
ts_since_epoch: Duration,
vel: f64,
direction: u16,
) -> Result<()> {
self.conn.execute(
let ts = ts_since_epoch.as_secs_f64();
sqlx::query!(
"INSERT INTO wind (ts, vel, direction) VALUES (?1, ?2, ?3)",
(&ts_since_epoch.as_secs_f64(), &vel, &direction),
)?;
ts,
vel,
direction,
)
.execute(&self.pool)
.await?;
Ok(())
}

pub fn data_since(&self, duration: Duration) -> Result<Vec<Measurement>> {
pub async fn data_since(&self, duration: Duration) -> Result<Vec<Measurement>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let threshold = now.as_secs_f64() - duration.as_secs_f64();
let mut stmt = self
.conn
.prepare("SELECT ts, vel, direction FROM wind WHERE ts > ?1")?;
let rows = stmt.query_map([threshold], |row| {
Ok(Measurement {
ts: UNIX_EPOCH + Duration::from_secs_f64(row.get(0)?),
vel: row.get(1)?,
direction: row.get(2)?,
})
})?;
let res = sqlx::query!(
"SELECT ts, vel, direction FROM wind WHERE ts > ?1",
threshold
)
.fetch_all(&self.pool)
.await?;

let mut measurements = Vec::new();
for row in rows {
measurements.push(row?)
for row in res {
let mesurement = Measurement {
ts: UNIX_EPOCH
+ Duration::from_secs_f64(row.ts.ok_or_else(|| anyhow::anyhow!("Not found"))?),
vel: row.vel.ok_or_else(|| anyhow::anyhow!("Not found"))?,
direction: row
.direction
.ok_or_else(|| anyhow::anyhow!("Not found"))?
.try_into()?,
};
measurements.push(mesurement);
}
dbg!(&measurements);
Ok(measurements)
}

pub async fn last_data(&self) -> Result<Measurement> {
let row = sqlx::query!("SELECT ts, vel, direction FROM wind ORDER BY ts DESC LIMIT 1",)
.fetch_one(&self.pool)
.await?;
dbg!(&row);
Ok(Measurement {
ts: UNIX_EPOCH
+ Duration::from_secs_f64(row.ts.ok_or_else(|| anyhow::anyhow!("Not found"))?),
vel: row.vel.ok_or_else(|| anyhow::anyhow!("Not found"))?,
direction: row
.direction
.ok_or_else(|| anyhow::anyhow!("Not found"))?
.try_into()?,
})
}
}

#[cfg(test)]
Expand All @@ -99,21 +141,21 @@ mod tests {
});
}

#[test]
fn test_db() -> Result<()> {
#[tokio::test]
async fn test_db() -> Result<()> {
init_tracing();
let db = DB::connect(Path::new("./test_db.sqlite"))?;
db.create_tables(true)?;
let db = DB::connect("./test_db.sqlite".to_string()).await?;
db.create_tables(true).await?;
let ts = SystemTime::now().duration_since(UNIX_EPOCH)? - Duration::from_secs(10);
db.insert_measurement_at_t(ts, 50000.0, 2)?;
db.insert_measurement_at_t(ts, 50000.0, 2).await?;
for i in 1..10 {
db.insert_measurement(i as f64, 2)?;
db.insert_measurement(i as f64, 2).await?;
}
let data = db.data_since(Duration::from_secs(5))?;
let data = db.data_since(Duration::from_secs(5)).await?;
let mean: f64 = data.iter().map(|m| m.vel).sum::<f64>() / data.len() as f64;
dbg!(&data, mean);
assert_eq!(mean, 5.0);
let data = db.data_since(Duration::from_secs(15))?;
let data = db.data_since(Duration::from_secs(15)).await?;
let mean: f64 = data.iter().map(|m| m.vel).sum::<f64>() / data.len() as f64;
dbg!(&data, mean);
assert!(mean > 10.0);
Expand Down

0 comments on commit 94ae112

Please sign in to comment.