From e9095d8f47363a04e3f3e249e3f26e4b3f31bb8a Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 29 Aug 2024 18:48:11 +0200 Subject: [PATCH] Have a control channel to ask rhio to import a file into minio --- rhio/src/blobs/actor.rs | 45 ++++++++++++++++++++++++++++------ rhio/src/blobs/mod.rs | 28 +++++++++++++++++++++ rhio/src/nats/mod.rs | 53 +++++++++++++++++++++++++++++++++++++--- rhio/src/node/actor.rs | 45 ++++++++++++++++++++++++++++++++-- rhio/src/node/control.rs | 13 ++++++++++ rhio/src/node/mod.rs | 12 ++++++--- 6 files changed, 179 insertions(+), 17 deletions(-) create mode 100644 rhio/src/node/control.rs diff --git a/rhio/src/blobs/actor.rs b/rhio/src/blobs/actor.rs index 8f941d1..563f6c1 100644 --- a/rhio/src/blobs/actor.rs +++ b/rhio/src/blobs/actor.rs @@ -1,15 +1,21 @@ +use std::path::PathBuf; + use anyhow::{anyhow, Result}; use iroh_blobs::store::bao_tree::io::fsm::AsyncSliceReader; use iroh_blobs::store::{MapEntry, Store}; -use p2panda_blobs::{Blobs as BlobsHandler, DownloadBlobEvent}; +use p2panda_blobs::{Blobs as BlobsHandler, DownloadBlobEvent, ImportBlobEvent}; use p2panda_core::Hash; use s3::creds::Credentials; use s3::{Bucket, BucketConfiguration, Region}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::StreamExt; -use tracing::{error, info}; +use tracing::error; pub enum ToBlobsActor { + ImportFile { + file_path: PathBuf, + reply: oneshot::Sender>, + }, ExportBlobMinio { hash: Hash, bucket_name: String, @@ -86,11 +92,12 @@ where async fn on_actor_message(&mut self, msg: ToBlobsActor) -> Result<()> { match msg { + ToBlobsActor::ImportFile { file_path, reply } => { + let result = self.on_import_file(file_path).await; + reply.send(result).ok(); + } ToBlobsActor::DownloadBlob { hash, reply } => { let result = self.on_download_blob(hash).await; - if result.is_ok() { - info!("downloaded blob {hash}"); - } reply.send(result).ok(); } ToBlobsActor::ExportBlobMinio { @@ -103,9 +110,6 @@ where let result = self .on_export_blob_minio(hash, bucket_name.clone(), region, credentials) .await; - if result.is_ok() { - info!("exported blob to minio: {hash} {bucket_name}"); - } reply.send(result).ok(); } ToBlobsActor::Shutdown { .. } => { @@ -116,6 +120,30 @@ where Ok(()) } + async fn on_import_file(&mut self, path: PathBuf) -> Result { + // @TODO: We're currently using the filesystem blob store to calculate the bao-tree hashes + // for the file. This is the only way to retrieve the blob hash right now. In the future we + // want to do all of this inside of MinIO and skip loading the file onto the file-system + // first. + let mut stream = Box::pin(self.blobs.import_blob(path.to_path_buf()).await); + + let hash = loop { + match stream.next().await { + Some(ImportBlobEvent::Done(hash)) => { + break Ok(hash); + } + Some(ImportBlobEvent::Abort(err)) => { + break Err(anyhow!("failed importing blob: {err}")); + } + None => { + break Err(anyhow!("failed importing blob")); + } + } + }?; + + Ok(hash) + } + async fn on_download_blob(&mut self, hash: Hash) -> Result<()> { let mut stream = Box::pin(self.blobs.download_blob(hash).await); while let Some(event) = stream.next().await { @@ -185,6 +213,7 @@ where error!("{response}"); return Err(anyhow::anyhow!(response)); } + Ok(()) } diff --git a/rhio/src/blobs/mod.rs b/rhio/src/blobs/mod.rs index 6b33f89..3b10038 100644 --- a/rhio/src/blobs/mod.rs +++ b/rhio/src/blobs/mod.rs @@ -1,5 +1,7 @@ mod actor; +use std::path::PathBuf; + use anyhow::{anyhow, Result}; use iroh_blobs::store::Store; use p2panda_blobs::Blobs as BlobsHandler; @@ -40,6 +42,32 @@ impl Blobs { } } + /// Import a file into the node's blob store on the file system and sync it with the internal + /// MinIO database. + // @TODO: We're currently using the filesystem blob store to calculate the bao-tree hashes + // for the file. This is the only way to retrieve the blob hash right now. In the future we + // want to do all of this inside of MinIO and skip loading the file onto the file-system + // first. + pub async fn import_file(&self, file_path: PathBuf) -> Result { + let (reply, reply_rx) = oneshot::channel(); + self.blobs_actor_tx + .send(ToBlobsActor::ImportFile { file_path, reply }) + .await?; + let hash = reply_rx.await??; + + if self.config.credentials.is_some() { + self.export_blob_minio( + hash, + self.config.region.clone(), + self.config.endpoint.clone(), + self.config.bucket_name.clone(), + ) + .await?; + } + + Ok(hash) + } + /// Export a blob to a minio bucket. /// /// Copies an existing blob from the blob store to the provided minio bucket. diff --git a/rhio/src/nats/mod.rs b/rhio/src/nats/mod.rs index 713aad8..546c60b 100644 --- a/rhio/src/nats/mod.rs +++ b/rhio/src/nats/mod.rs @@ -1,16 +1,20 @@ mod actor; mod consumer; +use std::path::PathBuf; + use anyhow::{Context, Result}; -use async_nats::ConnectOptions; -use p2panda_net::SharedAbortingJoinHandle; +use async_nats::{Client, ConnectOptions}; +use p2panda_net::{FromBytes, SharedAbortingJoinHandle}; use rhio_core::{Subject, TopicId}; use tokio::sync::{broadcast, mpsc, oneshot}; -use tracing::error; +use tokio_stream::StreamExt; +use tracing::{error, warn}; use crate::config::Config; use crate::nats::actor::{NatsActor, ToNatsActor}; pub use crate::nats::consumer::JetStreamEvent; +use crate::node::NodeControl; #[derive(Debug)] pub struct Nats { @@ -20,7 +24,7 @@ pub struct Nats { } impl Nats { - pub async fn new(config: Config) -> Result { + pub async fn new(config: Config, node_control_tx: mpsc::Sender) -> Result { // @TODO: Add auth options to NATS client config let nats_client = async_nats::connect_with_options(config.nats.endpoint.clone(), ConnectOptions::new()) @@ -30,6 +34,11 @@ impl Nats { config.nats.endpoint ))?; + // Start a "standard" NATS Core subscription (at-most-once delivery) to receive "live" + // control commands for `rhio` + spawn_control_handler(nats_client.clone(), node_control_tx); + + // Start the main NATS JetStream actor to dynamically maintain "stream consumers" let (nats_actor_tx, nats_actor_rx) = mpsc::channel(64); let nats_actor = NatsActor::new(nats_client, nats_actor_rx); @@ -95,3 +104,39 @@ impl Nats { Ok(()) } } + +// @TODO(adz): This might not be the ideal flow or place for it and serves as a "workaround". We +// can keep it here until we've finished the full MinIO store backend implementation, even though +// _some_ import control needs to be given in any case? +fn spawn_control_handler(nats_client: Client, node_control_tx: mpsc::Sender) { + tokio::spawn(async move { + let Ok(mut subscription) = nats_client.subscribe("rhio.*").await else { + error!("failed subscribing to minio control messages"); + return; + }; + + while let Some(message) = subscription.next().await { + if message.subject.as_str() != "rhio.import" { + continue; + } + + match PathBuf::from_bytes(&message.payload) { + Ok(file_path) => { + if let Err(err) = node_control_tx + .send(NodeControl::ImportBlob { + file_path, + reply_subject: message.reply, + }) + .await + { + error!("failed handling minio control message: {err}"); + break; + } + } + Err(err) => { + warn!("failed parsing minio control message: {err}"); + } + }; + } + }); +} diff --git a/rhio/src/node/actor.rs b/rhio/src/node/actor.rs index 3b1c487..fa8d642 100644 --- a/rhio/src/node/actor.rs +++ b/rhio/src/node/actor.rs @@ -1,14 +1,16 @@ use anyhow::{anyhow, bail, Context, Result}; use futures_util::stream::SelectAll; use p2panda_core::{Extension, Hash, Operation}; +use p2panda_net::ToBytes; use rhio_core::{decode_operation, encode_operation, RhioExtensions, Subject, TopicId}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use crate::blobs::Blobs; use crate::nats::{JetStreamEvent, Nats}; +use crate::node::NodeControl; use crate::panda::Panda; pub enum ToNodeActor { @@ -25,6 +27,7 @@ pub enum ToNodeActor { pub struct NodeActor { inbox: mpsc::Receiver, + node_control_rx: mpsc::Receiver, nats_consumer_rx: SelectAll>, p2panda_topic_rx: SelectAll>>, nats: Nats, @@ -33,9 +36,16 @@ pub struct NodeActor { } impl NodeActor { - pub fn new(nats: Nats, panda: Panda, blobs: Blobs, inbox: mpsc::Receiver) -> Self { + pub fn new( + nats: Nats, + panda: Panda, + blobs: Blobs, + inbox: mpsc::Receiver, + node_control_rx: mpsc::Receiver, + ) -> Self { Self { nats, + node_control_rx, nats_consumer_rx: SelectAll::new(), p2panda_topic_rx: SelectAll::new(), panda, @@ -79,6 +89,11 @@ impl NodeActor { } } }, + Some(command) = self.node_control_rx.recv() => { + if let Err(err) = self.on_control_command(command).await { + break Err(err); + } + }, Some(Ok(event)) = self.nats_consumer_rx.next() => { if let Err(err) = self.on_nats_event(event).await { break Err(err); @@ -275,6 +290,32 @@ impl NodeActor { Ok(()) } + async fn on_control_command(&self, command: NodeControl) -> Result<()> { + match command { + NodeControl::ImportBlob { + file_path, + reply_subject, + } => { + debug!("received control command to import {}", file_path.display()); + let hash = self.blobs.import_file(file_path.clone()).await?; + info!( + "import file {} completed, the resulting hash is: {}", + file_path.display(), + hash + ); + + // If the control command requested an reply via NATS Core, we will provide it! + if let Some(subject) = reply_subject { + self.nats + .publish(subject.to_string(), hash.to_bytes()) + .await?; + } + } + } + + Ok(()) + } + async fn shutdown(&self) -> Result<()> { self.nats.shutdown().await?; self.panda.shutdown().await?; diff --git a/rhio/src/node/control.rs b/rhio/src/node/control.rs new file mode 100644 index 0000000..cd05ff0 --- /dev/null +++ b/rhio/src/node/control.rs @@ -0,0 +1,13 @@ +use std::path::PathBuf; + +use async_nats::Subject; + +#[derive(Clone, Debug)] +pub enum NodeControl { + /// Node should import this file from given path into MinIO database and respond with resulting + /// bao-tree hash, optionally to NATS Core reply channel. + ImportBlob { + file_path: PathBuf, + reply_subject: Option, + }, +} diff --git a/rhio/src/node/mod.rs b/rhio/src/node/mod.rs index 0a8f034..0c1d55e 100644 --- a/rhio/src/node/mod.rs +++ b/rhio/src/node/mod.rs @@ -1,4 +1,5 @@ mod actor; +mod control; use std::net::SocketAddr; @@ -14,6 +15,7 @@ use crate::blobs::Blobs; use crate::config::Config; use crate::nats::Nats; use crate::node::actor::{NodeActor, ToNodeActor}; +pub use crate::node::control::NodeControl; use crate::panda::Panda; // @TODO: Give rhio a cool network id @@ -73,13 +75,17 @@ impl Node { // 4. Connect with NATS client to server and consume streams over "subjects" we're // interested in. The NATS jetstream is the p2panda persistance and transport layer and - // holds all past and future operations - let nats = Nats::new(config.clone()).await?; + // holds all past and future operations. + // + // Additionally we keep an "control channel" open via NATS Core, to receive internal + // messages on handling MinIO blob imports, etc. + let (node_control_tx, node_control_rx) = mpsc::channel(32); + let nats = Nats::new(config.clone(), node_control_tx).await?; // 5. Finally spawn actor which orchestrates blob storage and handling, p2panda networking // and NATS JetStream consumers let (node_actor_tx, node_actor_rx) = mpsc::channel(256); - let node_actor = NodeActor::new(nats, panda, blobs, node_actor_rx); + let node_actor = NodeActor::new(nats, panda, blobs, node_actor_rx, node_control_rx); let actor_handle = tokio::task::spawn(async move { if let Err(err) = node_actor.run().await { error!("node actor failed: {err:?}");