Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue together NATS JetStream, MinIO blob store and p2panda gossip #58

Merged
merged 43 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
94ac45d
Clarify that gossip is _not_ taking place during init
adzialocha Aug 28, 2024
ef228ec
Glue together NATS JetStream and p2panda message streams
adzialocha Aug 28, 2024
c92a2d3
Process blob announcements
adzialocha Aug 28, 2024
b75ba8e
Realized that we need to define the topic manually
adzialocha Aug 28, 2024
8e01dbb
Remove unused fields
adzialocha Aug 28, 2024
537b0bd
Clippy and remove unused code
adzialocha Aug 28, 2024
8d3c4c8
Define streams in config.toml file
adzialocha Aug 28, 2024
3c707a9
Update p2panda dependencies
adzialocha Aug 29, 2024
c0180c8
Do not serialize blob header when not set
adzialocha Aug 29, 2024
f3e3ea2
Fix getting len-prefixed data by wrongly using to_bytes
adzialocha Aug 29, 2024
421a906
Add more context to errors
adzialocha Aug 29, 2024
c5c9922
When only setting level for tracing, still scope it to rhio
adzialocha Aug 29, 2024
7a3d31a
Wait until initialised before entering gossip
adzialocha Aug 29, 2024
32faa6b
Do allow duplicate operations arriving via nats
adzialocha Aug 29, 2024
7d9593c
Have a control channel to ask rhio to import a file into minio
adzialocha Aug 29, 2024
c9b62da
rhio-client can announce blob hashes now as well
adzialocha Aug 29, 2024
1698730
Import paths are encoded as regular strings
adzialocha Aug 29, 2024
0dc7878
Do not check for given credentials to handle minio
adzialocha Aug 29, 2024
7224d25
Clean up error handling, add a comment
adzialocha Aug 29, 2024
f4674cd
Fix issue detecting blob announcements in rhio-client
adzialocha Aug 29, 2024
b583449
Show successful blob sync in logs
adzialocha Aug 29, 2024
47746c2
Make clippy happy
adzialocha Aug 29, 2024
c48df08
Add issues to TODOs
adzialocha Aug 29, 2024
628e54d
Remove TODO
adzialocha Aug 29, 2024
4f6876b
Update README.md
adzialocha Aug 29, 2024
8c4d08c
Nicer config.example.toml file
adzialocha Aug 29, 2024
876b5ea
Update READMEs
adzialocha Aug 29, 2024
5cc9e12
Remove docker-compose.yaml for now
adzialocha Aug 29, 2024
711ab7b
One more README.md update
adzialocha Aug 29, 2024
fcdd9ff
Typos
adzialocha Aug 29, 2024
07ddf18
Rename panda module to network
adzialocha Aug 29, 2024
de782d8
Remove all unused dependencies
adzialocha Aug 29, 2024
d9a33c4
Write more about development
adzialocha Aug 30, 2024
0ef9c12
Fix setting the filter string for tracing
adzialocha Aug 30, 2024
fa21e24
Improve README.md
adzialocha Aug 30, 2024
938b228
Grammar fixes
adzialocha Aug 30, 2024
f810b8b
Update README.md
adzialocha Aug 30, 2024
2fdc485
Update README.md
adzialocha Aug 30, 2024
161a1fc
Make clippy happy
adzialocha Aug 30, 2024
5934cc7
We can not wait for ack from publishing to NATS core
adzialocha Aug 30, 2024
5736a8b
Silently fail when ready oneshot channel got dropped
adzialocha Aug 30, 2024
8c35aa2
Update rhio/config.example.toml
adzialocha Aug 30, 2024
2209169
Clarify some things
adzialocha Aug 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 80 additions & 245 deletions Cargo.lock

Large diffs are not rendered by default.

152 changes: 142 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,159 @@
# rhio

> ℹ️ rhio is currently in an experimental development phase and (not yet) intended to be used in production setups. See the [issue tracker](https://github.com/HIRO-MicroDataCenters-BV/rhio/issues) for missing features.

rhio is a peer-to-peer message stream and blob storage solution allowing processes to rapidly exchange messages and efficiently replicate large blobs without any centralised coordination.

rhio has been designed to be integrated into a Kubernetes cluster where _internal_ cluster messaging and persistence is handled centrally via [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) while _external_ cluster messaging is decentralised and handled via [p2panda](https://p2panda.org). Blobs of any size are replicated separately with efficient [bao encoding](https://github.com/oconnor663/bao/tree/master) and stored in a [MinIO](https://min.io/) database.
rhio has been designed to be integrated into a Kubernetes cluster where _internal_ cluster messaging and persistence is handled centrally via [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) while _external_ cluster messaging is decentralised and handled via [p2panda](https://p2panda.org). Blobs of any size are replicated separately with efficient [bao encoding](https://github.com/oconnor663/bao) and stored in a [MinIO](https://min.io/) database.

Similar to NATS JetStream, any number of streams can be subscribed to and filtered by "subjects".

<details>
adzialocha marked this conversation as resolved.
Show resolved Hide resolved
<summary>Show diagram</summary>

```
.. other clusters ..

▲ │
Cluster │ │
┌──────────────────────────────────────────────────────────┼──┼──────┐
│ │ │ │
│ ┌─────────────────┐ ┌──────────┐ Publish ┌───┼──▼───┐ │
│ │ ┼─────────► ┼─────────────► │ │
│ │ .. other │ │ NATS │ │ rhio │ │
│ │ processes .. │ │ Server │ Subscribe │ p2p node │ │
│ │ ◄─────────┼ ◄─────────────┼ │ │
│ └─────────────────┘ └──────────┘ └───▲──┬───┘ │
│ │ │ │
│ │ │ │
│ ┌────┼──▼────┐ │
│ │ MinIO S3 │ │
│ │ Blob Store │ │
│ └────────────┘ │
└────────────────────────────────────────────────────────────────────┘
```
</details>

## Usage

`TODO`
1. Copy the [configuration file](/rhio/config.example.toml) and adjust it to your setup: `cp config.example.toml config.toml`
2. Run the `rhio` process via `rhio -c config.toml`
3. The process can be further configured via ENV vars or command line arguments:

```
Peer-to-peer message and blob streaming with MinIO and NATS JetStream support

Usage: rhio [OPTIONS]

Options:
-c, --config <PATH>
Path to "config.toml" file for further configuration.

When not set the program will try to find a `config.toml` file in the same folder the program is
executed in and otherwise in the regarding operation systems XDG config directory
("$HOME/.config/rhio/config.toml" on Linux).

-p, --bind-port <PORT>
Bind port of rhio node

-k, --private-key <PATH>
Path to file containing hexadecimal-encoded Ed25519 private key

-b, --blobs-dir <PATH>
Path to file-system blob store to temporarily load blobs into when importing to MinIO database.

WARNING: When left empty, an in-memory blob store is used instead which might lead to data corruption
as blob hashes are not kept between restarts. Use the in-memory store only for testing purposes.

-l, --log-level <LEVEL>
Set log verbosity. Use this for learning more about how your node behaves or for debugging.

Possible log levels are: ERROR, WARN, INFO, DEBUG, TRACE. They are scoped to "rhio" by default.

If you want to adjust the scope for deeper inspection use a filter value, for example
"=TRACE" for logging _everything_ or "rhio=INFO,async_nats=DEBUG" etc.

-h, --help
Print help (see a summary with '-h')

-V, --version
Print version
```

### Publish

#### Messages

rhio does not create or publish any messages by itself and serves merely as a "router" coordinating streams inside and outside the cluster. To publish messages into the stream the regular NATS Core or JetStream API is used. Other processes inside the cluster can independently publish messages to the NATS Server which will then be automatically picked up, processed and forwarded to other nodes by rhio.

Messages need to be encoded based on the p2panda [Operation](https://p2panda.org/specifications/namakemono/#operations) specification and contain the custom rhio headers.

With `rhio-client` library (Rust and Python) it is possible to create messages encoded in the right format. You can also use the interactive demo to send messages to any NATS server for the given subject:

```bash
rhio-client --subject foo.bar --endpoint localhost:4222
# Type: any message which should be received by all nodes ..
```

#### Blobs

Large files of any size can be imported into the local MinIO database and then announced on the network for other nodes to download them into their regarding MinIO databases. For this to take place in an efficient manner, the blob needs to be first encoded in the bao format. The resulting hash of this process can be used as a unique identifier to announce the blob on the network.

1. Instruct rhio to import and encode a file from the file system into the MinIO database. Send its path by publishing to the NATS Core subject `rhio.import`. The resulting hash is displayed in the server's logs. A [reply subject](https://docs.nats.io/nats-concepts/core-nats/reqreply) can also be specified for the resulting hash to be sent to.
```bash
nats request rhio.import /home/user/images/sloth.jpg
```
2. Publish a message which announces the blob hash on the network. Other peers will be made aware of this new blob and request to download it from the node. See "Messages" to understand how to publish these messages or use the interactive demo instead:
```bash
rhio-client --subject foo.bar --endpoint localhost:4222
# Type: blob <hash>
```

### Stream

rhio does not offer any direct APIs to subscribe to message streams. To consume data the regular NATS JetStream API is used and messages need to be validated as they are encoded in the p2panda Operation format.

## Publish
## Development

### Messages
### Prerequisites

rhio does not create or publish any messages by itself and serves merely as an "router" coordinating streams inside and outside the cluster. To publish messages into the stream the regular NATS JetStream API is used. Other processes inside the cluster can independently publish messages to the NATS Server which will then be automatically picked up and processed by rhio.
* Rust 1.80.1+
* [NATS Server](https://docs.nats.io/running-a-nats-service/introduction) with [JetStream](https://docs.nats.io/running-a-nats-service/configuration/resource_management) enabled
* [NATS Command Line Tool](https://docs.nats.io/using-nats/nats-tools/nats_cli)
* [MinIO](https://min.io/download)

## Blobs
### Installation and running

`TODO`
adzialocha marked this conversation as resolved.
Show resolved Hide resolved
1. Launch `rhio` node
```bash
# Run with default configurations
cargo run

## Stream
# Pass additional arguments to `rhio`
cargo run -- --config config.toml
```
2. Configure log level
```bash
# Enable additional logging
cargo run -- --log-level "DEBUG"

rhio does not offer any direct APIs to subscribe to message streams. To consume data the regular NATS JetStream API is used.
# Enable logging for specific target
cargo run -- --log-level "async_nats=DEBUG"

`TODO`
# Enable logging for _all_ targets
cargo run -- --log-level "=TRACE"
```
3. Launch `rhio-client` demo client
```bash
cargo run --bin rhio-client -- --subject foo.bar
```
4. Run tests, linters and format checkers
```bash
cargo test
cargo clippy
cargo fmt
```
5. Build `rhio` for production
```bash
cargo build --release
```
21 changes: 0 additions & 21 deletions docker-compose.yml

This file was deleted.

16 changes: 13 additions & 3 deletions rhio-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Result;
use async_nats::jetstream::Context;
use p2panda_core::PrivateKey;
use p2panda_core::{Hash, PrivateKey};
use p2panda_store::MemoryStore;
use rhio_core::{create_operation, encode_operation, LogId, RhioExtensions};
use rhio_core::{
create_blob_announcement, create_message, encode_operation, LogId, RhioExtensions,
};

pub struct Client {
jetstream: JetStream,
Expand All @@ -28,7 +30,15 @@ impl Client {
}

pub async fn publish(&mut self, subject: String, payload: &[u8]) -> Result<()> {
let operation = create_operation(&mut self.store, &self.private_key, &subject, payload)?;
let operation = create_message(&mut self.store, &self.private_key, &subject, payload)?;
let encoded_operation = encode_operation(operation.header, operation.body)?;
self.jetstream.publish(subject, encoded_operation).await?;
Ok(())
}

pub async fn announce_blob(&mut self, subject: String, hash: Hash) -> Result<()> {
let operation =
create_blob_announcement(&mut self.store, &self.private_key, &subject, hash)?;
let encoded_operation = encode_operation(operation.header, operation.body)?;
self.jetstream.publish(subject, encoded_operation).await?;
Ok(())
Expand Down
22 changes: 18 additions & 4 deletions rhio-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::str::FromStr;

use anyhow::{Context, Result};
use clap::Parser;
use p2panda_core::Hash;
use rhio_client::Client;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -34,10 +37,21 @@ async fn main() -> Result<()> {

loop {
tokio::select! {
Some(payload) = line_rx.recv() => {
client
.publish(args.subject.clone(), payload.as_bytes())
.await?;
Some(line) = line_rx.recv() => {
// If user writes a string, starting with "blob" (4 characters), followed by a
// space (1 character) and then ending with a hex-encoded BLAKE3 hash (64
// characters) and 1 control character (CR), then we interpret this as a blob
// announcement!
if line.len() == 4 + 1 + 64 + 1 && line.to_lowercase().starts_with("blob") {
let hash = Hash::from_str(&line[5..69])?;
client
.announce_blob(args.subject.clone(), hash)
.await?;
} else {
client
.publish(args.subject.clone(), line.as_bytes())
.await?;
}
}
_ = tokio::signal::ctrl_c() => {
break;
Expand Down
15 changes: 14 additions & 1 deletion rhio-core/src/extensions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use p2panda_core::Extension;
use p2panda_core::{Extension, Hash};
use serde::{Deserialize, Serialize};

/// NATS "subject" which are similar to p2panda or Kafka "topics".
Expand All @@ -12,11 +12,24 @@ pub type Subject = String;

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct RhioExtensions {
/// Mandatory field containing the NATS subject.
#[serde(rename = "s")]
pub subject: Option<Subject>,

/// Optional field for messages which announce new blobs in the network, identified by this
/// hash. p2panda peers will connect to other nodes and replicate the blob on receipt.
#[serde(rename = "b", skip_serializing_if = "Option::is_none")]
pub blob_hash: Option<Hash>,
}

impl Extension<Subject> for RhioExtensions {
fn extract(&self) -> Option<Subject> {
self.subject.clone()
}
}

impl Extension<Hash> for RhioExtensions {
fn extract(&self) -> Option<Hash> {
self.blob_hash
}
}
5 changes: 4 additions & 1 deletion rhio-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ pub mod topic_id;

pub use extensions::{RhioExtensions, Subject};
pub use log_id::LogId;
pub use operation::{create_operation, decode_operation, encode_operation, ingest_operation};
pub use operation::{
create_blob_announcement, create_message, create_operation, decode_operation, encode_operation,
ingest_operation,
};
pub use private_key::{generate_ephemeral_private_key, generate_or_load_private_key};
pub use topic_id::TopicId;
Loading