Skip to content

Commit

Permalink
Glue together NATS JetStream, MinIO blob store and p2panda gossip (#58)
Browse files Browse the repository at this point in the history
* Clarify that gossip is _not_ taking place during init

* Glue together NATS JetStream and p2panda message streams

* Process blob announcements

* Realized that we need to define the topic manually

* Remove unused fields

* Clippy and remove unused code

* Define streams in config.toml file

* Update p2panda dependencies

* Do not serialize blob header when not set

* Fix getting len-prefixed data by wrongly using to_bytes

* Add more context to errors

* When only setting level for tracing, still scope it to rhio

* Wait until initialised before entering gossip

* Do allow duplicate operations arriving via nats

* Have a control channel to ask rhio to import a file into minio

* rhio-client can announce blob hashes now as well

* Import paths are encoded as regular strings

* Do not check for given credentials to handle minio

* Clean up error handling, add a comment

* Fix issue detecting blob announcements in rhio-client

* Show successful blob sync in logs

* Make clippy happy

* Add issues to TODOs

* Remove TODO

* Update README.md

* Nicer config.example.toml file

* Update READMEs

* Remove docker-compose.yaml for now

* One more README.md update

* Typos

* Rename panda module to network

* Remove all unused dependencies

* Write more about development

* Fix setting the filter string for tracing

* Improve README.md

* Grammar fixes

* Update README.md

* Update README.md

* Make clippy happy

* We can not wait for ack from publishing to NATS core

* Silently fail when ready oneshot channel got dropped

* Update rhio/config.example.toml

Co-authored-by: Sam Andreae <contact@samandreae.com>

* Clarify some things

---------

Co-authored-by: Sam Andreae <contact@samandreae.com>
  • Loading branch information
adzialocha and sandreae committed Aug 30, 2024
1 parent 4e02b3a commit 2d8b1d4
Show file tree
Hide file tree
Showing 26 changed files with 1,093 additions and 679 deletions.
325 changes: 80 additions & 245 deletions Cargo.lock

Large diffs are not rendered by default.

152 changes: 142 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,159 @@
# rhio

> ℹ️ rhio is currently in an experimental development phase and (not yet) intended to be used in production setups. See the [issue tracker](https://github.com/HIRO-MicroDataCenters-BV/rhio/issues) for missing features.
rhio is a peer-to-peer message stream and blob storage solution allowing processes to rapidly exchange messages and efficiently replicate large blobs without any centralised coordination.

rhio has been designed to be integrated into a Kubernetes cluster where _internal_ cluster messaging and persistence is handled centrally via [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) while _external_ cluster messaging is decentralised and handled via [p2panda](https://p2panda.org). Blobs of any size are replicated separately with efficient [bao encoding](https://github.com/oconnor663/bao/tree/master) and stored in a [MinIO](https://min.io/) database.
rhio has been designed to be integrated into a Kubernetes cluster where _internal_ cluster messaging and persistence is handled centrally via [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) while _external_ cluster messaging is decentralised and handled via [p2panda](https://p2panda.org). Blobs of any size are replicated separately with efficient [bao encoding](https://github.com/oconnor663/bao) and stored in a [MinIO](https://min.io/) database.

Similar to NATS JetStream, any number of streams can be subscribed to and filtered by "subjects".

<details>
<summary>Show diagram</summary>

```
.. other clusters ..
▲ │
Cluster │ │
┌──────────────────────────────────────────────────────────┼──┼──────┐
│ │ │ │
│ ┌─────────────────┐ ┌──────────┐ Publish ┌───┼──▼───┐ │
│ │ ┼─────────► ┼─────────────► │ │
│ │ .. other │ │ NATS │ │ rhio │ │
│ │ processes .. │ │ Server │ Subscribe │ p2p node │ │
│ │ ◄─────────┼ ◄─────────────┼ │ │
│ └─────────────────┘ └──────────┘ └───▲──┬───┘ │
│ │ │ │
│ │ │ │
│ ┌────┼──▼────┐ │
│ │ MinIO S3 │ │
│ │ Blob Store │ │
│ └────────────┘ │
└────────────────────────────────────────────────────────────────────┘
```
</details>

## Usage

`TODO`
1. Copy the [configuration file](/rhio/config.example.toml) and adjust it to your setup: `cp config.example.toml config.toml`
2. Run the `rhio` process via `rhio -c config.toml`
3. The process can be further configured via ENV vars or command line arguments:

```
Peer-to-peer message and blob streaming with MinIO and NATS JetStream support
Usage: rhio [OPTIONS]
Options:
-c, --config <PATH>
Path to "config.toml" file for further configuration.
When not set the program will try to find a `config.toml` file in the same folder the program is
executed in and otherwise in the regarding operation systems XDG config directory
("$HOME/.config/rhio/config.toml" on Linux).
-p, --bind-port <PORT>
Bind port of rhio node
-k, --private-key <PATH>
Path to file containing hexadecimal-encoded Ed25519 private key
-b, --blobs-dir <PATH>
Path to file-system blob store to temporarily load blobs into when importing to MinIO database.
WARNING: When left empty, an in-memory blob store is used instead which might lead to data corruption
as blob hashes are not kept between restarts. Use the in-memory store only for testing purposes.
-l, --log-level <LEVEL>
Set log verbosity. Use this for learning more about how your node behaves or for debugging.
Possible log levels are: ERROR, WARN, INFO, DEBUG, TRACE. They are scoped to "rhio" by default.
If you want to adjust the scope for deeper inspection use a filter value, for example
"=TRACE" for logging _everything_ or "rhio=INFO,async_nats=DEBUG" etc.
-h, --help
Print help (see a summary with '-h')
-V, --version
Print version
```

### Publish

#### Messages

rhio does not create or publish any messages by itself and serves merely as a "router" coordinating streams inside and outside the cluster. To publish messages into the stream the regular NATS Core or JetStream API is used. Other processes inside the cluster can independently publish messages to the NATS Server which will then be automatically picked up, processed and forwarded to other nodes by rhio.

Messages need to be encoded based on the p2panda [Operation](https://p2panda.org/specifications/namakemono/#operations) specification and contain the custom rhio headers.

With `rhio-client` library (Rust and Python) it is possible to create messages encoded in the right format. You can also use the interactive demo to send messages to any NATS server for the given subject:

```bash
rhio-client --subject foo.bar --endpoint localhost:4222
# Type: any message which should be received by all nodes ..
```

#### Blobs

Large files of any size can be imported into the local MinIO database and then announced on the network for other nodes to download them into their regarding MinIO databases. For this to take place in an efficient manner, the blob needs to be first encoded in the bao format. The resulting hash of this process can be used as a unique identifier to announce the blob on the network.

1. Instruct rhio to import and encode a file from the file system into the MinIO database. Send its path by publishing to the NATS Core subject `rhio.import`. The resulting hash is displayed in the server's logs. A [reply subject](https://docs.nats.io/nats-concepts/core-nats/reqreply) can also be specified for the resulting hash to be sent to.
```bash
nats request rhio.import /home/user/images/sloth.jpg
```
2. Publish a message which announces the blob hash on the network. Other peers will be made aware of this new blob and request to download it from the node. See "Messages" to understand how to publish these messages or use the interactive demo instead:
```bash
rhio-client --subject foo.bar --endpoint localhost:4222
# Type: blob <hash>
```

### Stream

rhio does not offer any direct APIs to subscribe to message streams. To consume data the regular NATS JetStream API is used and messages need to be validated as they are encoded in the p2panda Operation format.

## Publish
## Development

### Messages
### Prerequisites

rhio does not create or publish any messages by itself and serves merely as an "router" coordinating streams inside and outside the cluster. To publish messages into the stream the regular NATS JetStream API is used. Other processes inside the cluster can independently publish messages to the NATS Server which will then be automatically picked up and processed by rhio.
* Rust 1.80.1+
* [NATS Server](https://docs.nats.io/running-a-nats-service/introduction) with [JetStream](https://docs.nats.io/running-a-nats-service/configuration/resource_management) enabled
* [NATS Command Line Tool](https://docs.nats.io/using-nats/nats-tools/nats_cli)
* [MinIO](https://min.io/download)

## Blobs
### Installation and running

`TODO`
1. Launch `rhio` node
```bash
# Run with default configurations
cargo run

## Stream
# Pass additional arguments to `rhio`
cargo run -- --config config.toml
```
2. Configure log level
```bash
# Enable additional logging
cargo run -- --log-level "DEBUG"

rhio does not offer any direct APIs to subscribe to message streams. To consume data the regular NATS JetStream API is used.
# Enable logging for specific target
cargo run -- --log-level "async_nats=DEBUG"

`TODO`
# Enable logging for _all_ targets
cargo run -- --log-level "=TRACE"
```
3. Launch `rhio-client` demo client
```bash
cargo run --bin rhio-client -- --subject foo.bar
```
4. Run tests, linters and format checkers
```bash
cargo test
cargo clippy
cargo fmt
```
5. Build `rhio` for production
```bash
cargo build --release
```
21 changes: 0 additions & 21 deletions docker-compose.yml

This file was deleted.

16 changes: 13 additions & 3 deletions rhio-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Result;
use async_nats::jetstream::Context;
use p2panda_core::PrivateKey;
use p2panda_core::{Hash, PrivateKey};
use p2panda_store::MemoryStore;
use rhio_core::{create_operation, encode_operation, LogId, RhioExtensions};
use rhio_core::{
create_blob_announcement, create_message, encode_operation, LogId, RhioExtensions,
};

pub struct Client {
jetstream: JetStream,
Expand All @@ -28,7 +30,15 @@ impl Client {
}

pub async fn publish(&mut self, subject: String, payload: &[u8]) -> Result<()> {
let operation = create_operation(&mut self.store, &self.private_key, &subject, payload)?;
let operation = create_message(&mut self.store, &self.private_key, &subject, payload)?;
let encoded_operation = encode_operation(operation.header, operation.body)?;
self.jetstream.publish(subject, encoded_operation).await?;
Ok(())
}

pub async fn announce_blob(&mut self, subject: String, hash: Hash) -> Result<()> {
let operation =
create_blob_announcement(&mut self.store, &self.private_key, &subject, hash)?;
let encoded_operation = encode_operation(operation.header, operation.body)?;
self.jetstream.publish(subject, encoded_operation).await?;
Ok(())
Expand Down
22 changes: 18 additions & 4 deletions rhio-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::str::FromStr;

use anyhow::{Context, Result};
use clap::Parser;
use p2panda_core::Hash;
use rhio_client::Client;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -34,10 +37,21 @@ async fn main() -> Result<()> {

loop {
tokio::select! {
Some(payload) = line_rx.recv() => {
client
.publish(args.subject.clone(), payload.as_bytes())
.await?;
Some(line) = line_rx.recv() => {
// If user writes a string, starting with "blob" (4 characters), followed by a
// space (1 character) and then ending with a hex-encoded BLAKE3 hash (64
// characters) and 1 control character (CR), then we interpret this as a blob
// announcement!
if line.len() == 4 + 1 + 64 + 1 && line.to_lowercase().starts_with("blob") {
let hash = Hash::from_str(&line[5..69])?;
client
.announce_blob(args.subject.clone(), hash)
.await?;
} else {
client
.publish(args.subject.clone(), line.as_bytes())
.await?;
}
}
_ = tokio::signal::ctrl_c() => {
break;
Expand Down
15 changes: 14 additions & 1 deletion rhio-core/src/extensions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use p2panda_core::Extension;
use p2panda_core::{Extension, Hash};
use serde::{Deserialize, Serialize};

/// NATS "subject" which are similar to p2panda or Kafka "topics".
Expand All @@ -12,11 +12,24 @@ pub type Subject = String;

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct RhioExtensions {
/// Mandatory field containing the NATS subject.
#[serde(rename = "s")]
pub subject: Option<Subject>,

/// Optional field for messages which announce new blobs in the network, identified by this
/// hash. p2panda peers will connect to other nodes and replicate the blob on receipt.
#[serde(rename = "b", skip_serializing_if = "Option::is_none")]
pub blob_hash: Option<Hash>,
}

impl Extension<Subject> for RhioExtensions {
fn extract(&self) -> Option<Subject> {
self.subject.clone()
}
}

impl Extension<Hash> for RhioExtensions {
fn extract(&self) -> Option<Hash> {
self.blob_hash
}
}
5 changes: 4 additions & 1 deletion rhio-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ pub mod topic_id;

pub use extensions::{RhioExtensions, Subject};
pub use log_id::LogId;
pub use operation::{create_operation, decode_operation, encode_operation, ingest_operation};
pub use operation::{
create_blob_announcement, create_message, create_operation, decode_operation, encode_operation,
ingest_operation,
};
pub use private_key::{generate_ephemeral_private_key, generate_or_load_private_key};
pub use topic_id::TopicId;
Loading

0 comments on commit 2d8b1d4

Please sign in to comment.