Skip to content

Commit

Permalink
Have a control channel to ask rhio to import a file into minio
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Aug 29, 2024
1 parent 01663d9 commit e9095d8
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 17 deletions.
45 changes: 37 additions & 8 deletions rhio/src/blobs/actor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::path::PathBuf;

use anyhow::{anyhow, Result};
use iroh_blobs::store::bao_tree::io::fsm::AsyncSliceReader;
use iroh_blobs::store::{MapEntry, Store};
use p2panda_blobs::{Blobs as BlobsHandler, DownloadBlobEvent};
use p2panda_blobs::{Blobs as BlobsHandler, DownloadBlobEvent, ImportBlobEvent};
use p2panda_core::Hash;
use s3::creds::Credentials;
use s3::{Bucket, BucketConfiguration, Region};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tracing::{error, info};
use tracing::error;

pub enum ToBlobsActor {
ImportFile {
file_path: PathBuf,
reply: oneshot::Sender<Result<Hash>>,
},
ExportBlobMinio {
hash: Hash,
bucket_name: String,
Expand Down Expand Up @@ -86,11 +92,12 @@ where

async fn on_actor_message(&mut self, msg: ToBlobsActor) -> Result<()> {
match msg {
ToBlobsActor::ImportFile { file_path, reply } => {
let result = self.on_import_file(file_path).await;
reply.send(result).ok();
}
ToBlobsActor::DownloadBlob { hash, reply } => {
let result = self.on_download_blob(hash).await;
if result.is_ok() {
info!("downloaded blob {hash}");
}
reply.send(result).ok();
}
ToBlobsActor::ExportBlobMinio {
Expand All @@ -103,9 +110,6 @@ where
let result = self
.on_export_blob_minio(hash, bucket_name.clone(), region, credentials)
.await;
if result.is_ok() {
info!("exported blob to minio: {hash} {bucket_name}");
}
reply.send(result).ok();
}
ToBlobsActor::Shutdown { .. } => {
Expand All @@ -116,6 +120,30 @@ where
Ok(())
}

async fn on_import_file(&mut self, path: PathBuf) -> Result<Hash> {
// @TODO: We're currently using the filesystem blob store to calculate the bao-tree hashes
// for the file. This is the only way to retrieve the blob hash right now. In the future we
// want to do all of this inside of MinIO and skip loading the file onto the file-system
// first.
let mut stream = Box::pin(self.blobs.import_blob(path.to_path_buf()).await);

let hash = loop {
match stream.next().await {
Some(ImportBlobEvent::Done(hash)) => {
break Ok(hash);
}
Some(ImportBlobEvent::Abort(err)) => {
break Err(anyhow!("failed importing blob: {err}"));
}
None => {
break Err(anyhow!("failed importing blob"));
}
}
}?;

Ok(hash)
}

async fn on_download_blob(&mut self, hash: Hash) -> Result<()> {
let mut stream = Box::pin(self.blobs.download_blob(hash).await);
while let Some(event) = stream.next().await {
Expand Down Expand Up @@ -185,6 +213,7 @@ where
error!("{response}");
return Err(anyhow::anyhow!(response));
}

Ok(())
}

Expand Down
28 changes: 28 additions & 0 deletions rhio/src/blobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod actor;

use std::path::PathBuf;

use anyhow::{anyhow, Result};
use iroh_blobs::store::Store;
use p2panda_blobs::Blobs as BlobsHandler;
Expand Down Expand Up @@ -40,6 +42,32 @@ impl Blobs {
}
}

/// Import a file into the node's blob store on the file system and sync it with the internal
/// MinIO database.
// @TODO: We're currently using the filesystem blob store to calculate the bao-tree hashes
// for the file. This is the only way to retrieve the blob hash right now. In the future we
// want to do all of this inside of MinIO and skip loading the file onto the file-system
// first.
pub async fn import_file(&self, file_path: PathBuf) -> Result<Hash> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::ImportFile { file_path, reply })
.await?;
let hash = reply_rx.await??;

if self.config.credentials.is_some() {
self.export_blob_minio(
hash,
self.config.region.clone(),
self.config.endpoint.clone(),
self.config.bucket_name.clone(),
)
.await?;
}

Ok(hash)
}

/// Export a blob to a minio bucket.
///
/// Copies an existing blob from the blob store to the provided minio bucket.
Expand Down
53 changes: 49 additions & 4 deletions rhio/src/nats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
mod actor;
mod consumer;

use std::path::PathBuf;

use anyhow::{Context, Result};
use async_nats::ConnectOptions;
use p2panda_net::SharedAbortingJoinHandle;
use async_nats::{Client, ConnectOptions};
use p2panda_net::{FromBytes, SharedAbortingJoinHandle};
use rhio_core::{Subject, TopicId};
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::error;
use tokio_stream::StreamExt;
use tracing::{error, warn};

use crate::config::Config;
use crate::nats::actor::{NatsActor, ToNatsActor};
pub use crate::nats::consumer::JetStreamEvent;
use crate::node::NodeControl;

#[derive(Debug)]
pub struct Nats {
Expand All @@ -20,7 +24,7 @@ pub struct Nats {
}

impl Nats {
pub async fn new(config: Config) -> Result<Self> {
pub async fn new(config: Config, node_control_tx: mpsc::Sender<NodeControl>) -> Result<Self> {
// @TODO: Add auth options to NATS client config
let nats_client =
async_nats::connect_with_options(config.nats.endpoint.clone(), ConnectOptions::new())
Expand All @@ -30,6 +34,11 @@ impl Nats {
config.nats.endpoint
))?;

// Start a "standard" NATS Core subscription (at-most-once delivery) to receive "live"
// control commands for `rhio`
spawn_control_handler(nats_client.clone(), node_control_tx);

// Start the main NATS JetStream actor to dynamically maintain "stream consumers"
let (nats_actor_tx, nats_actor_rx) = mpsc::channel(64);
let nats_actor = NatsActor::new(nats_client, nats_actor_rx);

Expand Down Expand Up @@ -95,3 +104,39 @@ impl Nats {
Ok(())
}
}

// @TODO(adz): This might not be the ideal flow or place for it and serves as a "workaround". We
// can keep it here until we've finished the full MinIO store backend implementation, even though
// _some_ import control needs to be given in any case?
fn spawn_control_handler(nats_client: Client, node_control_tx: mpsc::Sender<NodeControl>) {
tokio::spawn(async move {
let Ok(mut subscription) = nats_client.subscribe("rhio.*").await else {
error!("failed subscribing to minio control messages");
return;
};

while let Some(message) = subscription.next().await {
if message.subject.as_str() != "rhio.import" {
continue;
}

match PathBuf::from_bytes(&message.payload) {
Ok(file_path) => {
if let Err(err) = node_control_tx
.send(NodeControl::ImportBlob {
file_path,
reply_subject: message.reply,
})
.await
{
error!("failed handling minio control message: {err}");
break;
}
}
Err(err) => {
warn!("failed parsing minio control message: {err}");
}
};
}
});
}
45 changes: 43 additions & 2 deletions rhio/src/node/actor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use anyhow::{anyhow, bail, Context, Result};
use futures_util::stream::SelectAll;
use p2panda_core::{Extension, Hash, Operation};
use p2panda_net::ToBytes;
use rhio_core::{decode_operation, encode_operation, RhioExtensions, Subject, TopicId};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::{debug, error};
use tracing::{debug, error, info};

use crate::blobs::Blobs;
use crate::nats::{JetStreamEvent, Nats};
use crate::node::NodeControl;
use crate::panda::Panda;

pub enum ToNodeActor {
Expand All @@ -25,6 +27,7 @@ pub enum ToNodeActor {

pub struct NodeActor {
inbox: mpsc::Receiver<ToNodeActor>,
node_control_rx: mpsc::Receiver<NodeControl>,
nats_consumer_rx: SelectAll<BroadcastStream<JetStreamEvent>>,
p2panda_topic_rx: SelectAll<BroadcastStream<Operation<RhioExtensions>>>,
nats: Nats,
Expand All @@ -33,9 +36,16 @@ pub struct NodeActor {
}

impl NodeActor {
pub fn new(nats: Nats, panda: Panda, blobs: Blobs, inbox: mpsc::Receiver<ToNodeActor>) -> Self {
pub fn new(
nats: Nats,
panda: Panda,
blobs: Blobs,
inbox: mpsc::Receiver<ToNodeActor>,
node_control_rx: mpsc::Receiver<NodeControl>,
) -> Self {
Self {
nats,
node_control_rx,
nats_consumer_rx: SelectAll::new(),
p2panda_topic_rx: SelectAll::new(),
panda,
Expand Down Expand Up @@ -79,6 +89,11 @@ impl NodeActor {
}
}
},
Some(command) = self.node_control_rx.recv() => {
if let Err(err) = self.on_control_command(command).await {
break Err(err);
}
},
Some(Ok(event)) = self.nats_consumer_rx.next() => {
if let Err(err) = self.on_nats_event(event).await {
break Err(err);
Expand Down Expand Up @@ -275,6 +290,32 @@ impl NodeActor {
Ok(())
}

async fn on_control_command(&self, command: NodeControl) -> Result<()> {
match command {
NodeControl::ImportBlob {
file_path,
reply_subject,
} => {
debug!("received control command to import {}", file_path.display());
let hash = self.blobs.import_file(file_path.clone()).await?;
info!(
"import file {} completed, the resulting hash is: {}",
file_path.display(),
hash
);

// If the control command requested an reply via NATS Core, we will provide it!
if let Some(subject) = reply_subject {
self.nats
.publish(subject.to_string(), hash.to_bytes())
.await?;
}
}
}

Ok(())
}

async fn shutdown(&self) -> Result<()> {
self.nats.shutdown().await?;
self.panda.shutdown().await?;
Expand Down
13 changes: 13 additions & 0 deletions rhio/src/node/control.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::path::PathBuf;

use async_nats::Subject;

#[derive(Clone, Debug)]
pub enum NodeControl {
/// Node should import this file from given path into MinIO database and respond with resulting
/// bao-tree hash, optionally to NATS Core reply channel.
ImportBlob {
file_path: PathBuf,
reply_subject: Option<Subject>,
},
}
12 changes: 9 additions & 3 deletions rhio/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod actor;
mod control;

use std::net::SocketAddr;

Expand All @@ -14,6 +15,7 @@ use crate::blobs::Blobs;
use crate::config::Config;
use crate::nats::Nats;
use crate::node::actor::{NodeActor, ToNodeActor};
pub use crate::node::control::NodeControl;
use crate::panda::Panda;

// @TODO: Give rhio a cool network id
Expand Down Expand Up @@ -73,13 +75,17 @@ impl Node {

// 4. Connect with NATS client to server and consume streams over "subjects" we're
// interested in. The NATS jetstream is the p2panda persistance and transport layer and
// holds all past and future operations
let nats = Nats::new(config.clone()).await?;
// holds all past and future operations.
//
// Additionally we keep an "control channel" open via NATS Core, to receive internal
// messages on handling MinIO blob imports, etc.
let (node_control_tx, node_control_rx) = mpsc::channel(32);
let nats = Nats::new(config.clone(), node_control_tx).await?;

// 5. Finally spawn actor which orchestrates blob storage and handling, p2panda networking
// and NATS JetStream consumers
let (node_actor_tx, node_actor_rx) = mpsc::channel(256);
let node_actor = NodeActor::new(nats, panda, blobs, node_actor_rx);
let node_actor = NodeActor::new(nats, panda, blobs, node_actor_rx, node_control_rx);
let actor_handle = tokio::task::spawn(async move {
if let Err(err) = node_actor.run().await {
error!("node actor failed: {err:?}");
Expand Down

0 comments on commit e9095d8

Please sign in to comment.