Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server side events to notify the clients #346

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ya-relay-core = { workspace = true }
actix-rt = "2.7"
serde = { version = "1.0.192", features = ["derive"] }
rmp-serde = "1"
serde_json = "1.0.128"
serde_bytes = "0.11.12"
anyhow = "1.0"
chrono = "0.4"
Expand All @@ -30,6 +31,7 @@ lazy_static = "1.4.0"
dashmap = "4.0.2"

tokio = { version = "1", features = ["net", "sync", "macros", "time", "rt", "io-util"] }
tokio-stream = "0.1.16"
tokio-util = { version = "0.7", features = ["codec"] }
hex = "0.4.3"
parking_lot = "0.12.1"
Expand All @@ -38,7 +40,10 @@ quick_cache = "0.4.0"

tiny-keccak = "2"
actix-web = { version = "4.4.0", default-features = false, features = ["macros"] }
actix-web-lab = "0.22.0"
cfg-if = "1.0.0"
futures-util = "0.3.30"
actix-cors = "0.5" # Add this line

[target."cfg(unix)".dependencies]
libc = "0.2"
Expand All @@ -52,7 +57,7 @@ features = [

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }
tokio-stream = "0.1.8"
tokio-stream = "0.1.16"
test-case = "3.1"
ethsign = "0.8.0"
test-log = "0.2.13"
Expand Down
33 changes: 28 additions & 5 deletions server/src/bin/ya-relay-server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
use actix_web::get;
use actix_web::web;
use actix_web::Responder;
use actix_web_lab::sse::Sse;
use clap::Parser;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::future;
use std::net::SocketAddr;
use std::sync::Arc;

use actix_web::{get, web, Responder};
use serde::{Deserialize, Serialize};

use std::time::Duration;
use ya_relay_core::NodeId;
use ya_relay_server::metrics::register_metrics;
use ya_relay_server::sse::SseClients;
use ya_relay_server::{AddrStatus, Config, Selector, SessionManager};

#[get("/sse")]
async fn new_sse_client(sse_clients: web::Data<Arc<SseClients>>) -> impl Responder {
// Add a new client and get the receiver stream
let sse_stream = sse_clients.add_client().await;

// Map the `Event` stream to `Result<Event, Infallible>`
let result_stream = sse_stream.map(|event| Ok::<_, Infallible>(event));

// Return the SSE stream to the client
Sse::from_stream(result_stream).with_keep_alive(Duration::from_secs(10))
}

#[get("/sessions")]
async fn sessions_list(sm: web::Data<Arc<SessionManager>>) -> impl Responder {
format!("sessions: {}", sm.num_sessions())
Expand Down Expand Up @@ -82,21 +99,27 @@ async fn main() -> anyhow::Result<()> {

let args = Config::parse();

let sse_clients = Arc::new(SseClients::new());

let handle = register_metrics();

let server = ya_relay_server::run(&args).await?;
let server = ya_relay_server::run(&args, sse_clients.clone()).await?;

let sessions = web::Data::new(server.sessions());

let sse_clients_clone = web::Data::new(sse_clients.clone());

let web_server = actix_web::HttpServer::new(move || {
use actix_web::*;

let handle = handle.clone();

App::new()
.app_data(sessions.clone())
.app_data(sse_clients_clone.clone())
.service(nodes_list_prefix)
.service(sessions_list)
.service(new_sse_client)
.route("/", web::get().to(move || future::ready(handle.render())))
})
.workers(1)
Expand Down
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod config;
pub mod metrics;
mod server;
pub mod sse;
mod state;
#[cfg(feature = "test-utils")]
pub mod testing;
Expand Down
77 changes: 66 additions & 11 deletions server/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::AddrStatus;
use metrics::Counter;
use quick_cache::sync::Cache;
use rand::{thread_rng, Rng};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

use metrics::Counter;
use quick_cache::sync::Cache;
use rand::{thread_rng, Rng};
use tokio_util::codec::Decoder;

use ya_relay_core::challenge;
Expand Down Expand Up @@ -41,6 +41,7 @@ mod state_decoder;

mod ip_checker;

use crate::sse::{NodeInfo, SseClients, SseMessage};
pub use ip_checker::IpCheckerConfig;
pub use session::SessionHandlerConfig;

Expand Down Expand Up @@ -109,7 +110,7 @@ impl Drop for Server {
}
}

pub async fn run(config: &Config) -> anyhow::Result<Server> {
pub async fn run(config: &Config, sse_clients: Arc<SseClients>) -> anyhow::Result<Server> {
let bind_addr: SocketAddr = config.server.address;

let slot_manager = config
Expand Down Expand Up @@ -159,10 +160,12 @@ pub async fn run(config: &Config) -> anyhow::Result<Server> {
let server = {
let session_manager = session_manager.clone();
let slot_manager = slot_manager.clone();
let sse_clients = sse_clients.clone();

UdpServerBuilder::new(move |reply: Rc<UdpSocket>| {
let session_manager = session_manager.clone();
let slot_manager = slot_manager.clone();
let sse_clients = sse_clients.clone();
let checker_ip = reply.local_addr()?.ip();

let session_handler = session::SessionHandler::new(&session_manager, &session_handler_config);
Expand Down Expand Up @@ -222,7 +225,37 @@ pub async fn run(config: &Config) -> anyhow::Result<Server> {

match request {
request::Kind::Session(session) => {
session_handler.handle(&clock, src, request_id, session_id, &session)
let response = session_handler.handle(&clock, src, request_id, session_id, &session);
if let Some((_, Packet { session_id, .. })) = &response {
if let Ok(session_id) = SessionId::try_from(session_id.clone()) {
let sse_clients_clone = Arc::clone(&sse_clients);
let session_manager_clone = session_manager.clone();
let broadcast_future = async move {
// Add a small delay to allow the session to be fully registered
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if let Some(session_ref) = session_manager_clone.session(&session_id) {
let msg = SseMessage {
status: "connected".to_string(),
node: NodeInfo {
id: session_ref.node_id.to_string(),
peer: session_ref.peer.to_string(),
seen: format!("{:?}", session_ref.ts.age()),
addr_status: match &*session_ref.addr_status.lock() {
AddrStatus::Unknown => "Unknown".to_owned(),
AddrStatus::Pending(ts) => format!("pending({:?})", ts.elapsed()),
AddrStatus::Invalid(ts) => format!("invalid({:?})", ts.elapsed()),
AddrStatus::Valid(ts) => format!("valid({:?})", ts.elapsed()),
},
},
};
let json = serde_json::to_string(&msg).unwrap();
sse_clients_clone.broadcast(&json).await;
}
};
tokio::spawn(broadcast_future);
}
}
response
}
request::Kind::Ping(_) => {
session_id.and_then(|session_id| handle_ping(&clock, src, request_id, session_id, &session_manager))
Expand Down Expand Up @@ -257,11 +290,33 @@ pub async fn run(config: &Config) -> anyhow::Result<Server> {
}))
}))
}) => {
let session_id: Option<SessionId> = session_id.try_into().ok();
if let Some(session_id) = session_id {
session_manager.remove_session(&session_id);
log::debug!(target: "request:disconnect", "[{src}] session {session_id} disconnected");
}
let session_id: Option<SessionId> = session_id.try_into().ok();
if let Some(session_id) = session_id {
if let Some(session_ref) = session_manager.session(&session_id) {
let sse_clients_clone = Arc::clone(&sse_clients);
let broadcast_future = async move {
let msg = SseMessage {
status: "disconnected".to_string(),
node: NodeInfo {
id: session_ref.node_id.to_string(),
peer: session_ref.peer.to_string(),
seen: format!("{:?}", session_ref.ts.age()),
addr_status: match &*session_ref.addr_status.lock() {
AddrStatus::Unknown => "Unknown".to_owned(),
AddrStatus::Pending(ts) => format!("pending({:?})", ts.elapsed()),
AddrStatus::Invalid(ts) => format!("invalid({:?})", ts.elapsed()),
AddrStatus::Valid(ts) => format!("valid({:?})", ts.elapsed()),
},
},
};
let json = serde_json::to_string(&msg).unwrap();
sse_clients_clone.broadcast(&json).await;
};
tokio::spawn(broadcast_future);
}
session_manager.remove_session(&session_id);
log::debug!(target: "request:disconnect", "[{src}] session {session_id} disconnected");
}
None
}
PacketKind::Packet(Packet { session_id: _, kind: Some(packet::Kind::Control(Control { kind: Some(control::Kind::ResumeForwarding(_)) })) }) => {
Expand Down
56 changes: 56 additions & 0 deletions server/src/sse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use actix_web_lab::sse;
use log::info;
use serde::Serialize;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

#[derive(Serialize)]
pub struct SseMessage {
pub(crate) status: String,
pub(crate) node: NodeInfo,
}

#[derive(Serialize)]
pub struct NodeInfo {
pub(crate) id: String,
pub(crate) peer: String,
pub(crate) seen: String,
#[serde(rename = "addrStatus")]
pub(crate) addr_status: String,
}

#[derive(Debug, Clone, Default)]
pub struct SseClients {
clients: Arc<Mutex<Vec<mpsc::Sender<sse::Event>>>>,
}

impl SseClients {
pub fn new() -> Self {
SseClients {
clients: Arc::new(Mutex::new(Vec::new())),
}
}

pub async fn add_client(&self) -> ReceiverStream<sse::Event> {
let (tx, rx) = mpsc::channel(10);

// Send a "connected" message to the new client
tx.send(sse::Data::new("connected").into()).await.unwrap();

// Add the sender to the list of clients
self.clients.lock().unwrap().push(tx);
info!("New SSE connection established");

// Return the receiver stream to be used for SSE
ReceiverStream::new(rx)
}

pub async fn broadcast(&self, msg: &str) {
let clients = self.clients.lock().unwrap().clone();
let send_futures = clients
.iter()
.map(|client| client.send(sse::Data::new(msg).into()));
let _ = futures_util::future::join_all(send_futures).await;
}
}
5 changes: 4 additions & 1 deletion server/src/testing/server.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::config::Config;

use crate::server::{IpCheckerConfig, Server, ServerConfig, SessionHandlerConfig};
use crate::sse::SseClients;
use crate::SessionManagerConfig;
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use std::rc::Rc;
use std::sync::Arc;
use std::{future, net};
use tokio::time::Duration;
use ya_relay_core::testing::TestServerWrapper;
Expand Down Expand Up @@ -34,7 +36,8 @@ pub async fn init_test_server() -> anyhow::Result<ServerWrapper> {
}

pub async fn init_test_server_with_config(config: Config) -> anyhow::Result<ServerWrapper> {
let server = Rc::new(crate::run(&config).await?);
let sse_clients = Arc::new(SseClients::new());
let server = Rc::new(crate::run(&config, sse_clients.clone()).await?);

Ok(ServerWrapper { server })
}
Expand Down