diff --git a/Cargo.lock b/Cargo.lock index 6cda843..15bccbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,9 +116,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -130,15 +130,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" dependencies = [ "utf8parse", ] @@ -154,9 +154,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys", @@ -515,6 +515,21 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-server" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "447f28c85900215cc1bea282f32d4a2f22d55c5a300afdfbc661c8d6a632e063" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "tokio", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -871,9 +886,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive 4.4.2", @@ -881,9 +896,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -1016,9 +1031,9 @@ dependencies = [ [[package]] name = "const-hex" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa72a10d0e914cad6bcad4e7409e68d230c1c2db67896e19a37f758b1fcbdab5" +checksum = "c37be52ef5e3b394db27a2341010685ad5103c72ac15ce2e9420a7e8f93f342c" dependencies = [ "cfg-if", "cpufeatures", @@ -1333,7 +1348,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "lock_api", "once_cell", "parking_lot_core", @@ -1536,7 +1551,7 @@ checksum = "a4b1e0c257a9e9f25f90ff76d7a68360ed497ee519c8e428d1825ef0000799d4" dependencies = [ "der 0.7.8", "digest", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "rfc6979 0.4.0", "signature 2.1.0", "spki 0.7.2", @@ -1570,9 +1585,9 @@ dependencies = [ [[package]] name = "elliptic-curve" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "968405c8fdc9b3bf4df0a6638858cc0b52462836ab6b1c87377785dd09cf1c0b" +checksum = "d97ca172ae9dc9f9b779a6e3a65d308f2af74e5b8c921299075bdb4a0370e914" dependencies = [ "base16ct 0.2.0", "crypto-bigint 0.5.3", @@ -1661,9 +1676,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ "errno-dragonfly", "libc", @@ -1879,7 +1894,7 @@ dependencies = [ "cargo_metadata 0.17.0", "chrono", "const-hex", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "ethabi", "generic-array", "k256 0.13.1", @@ -2000,7 +2015,7 @@ dependencies = [ "coins-bip32", "coins-bip39", "const-hex", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "eth-keystore", "ethers-core 2.0.10", "rand", @@ -2337,14 +2352,14 @@ dependencies = [ [[package]] name = "graphcast-sdk" -version = "0.4.3" -source = "git+https://github.com/graphops/graphcast-sdk#6746c9a2d6cf48afd3e1ffa7e5dd3a5553d0c8d3" +version = "0.5.0-alpha.1" +source = "git+https://github.com/graphops/graphcast-sdk#ce9f5d3293329a52cc19d84a8b035c3f3747976f" dependencies = [ "anyhow", "async-graphql", "async-graphql-axum", "chrono", - "clap 4.4.5", + "clap 4.4.6", "data-encoding", "derive-getters", "dotenv", @@ -2493,9 +2508,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" [[package]] name = "hashers" @@ -2764,12 +2779,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad227c3af19d4914570ad36d30409928b75967c298feb9ea1969db3a610bb14e" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -2893,7 +2908,7 @@ checksum = "cadb76004ed8e97623117f3df85b17aaa6626ab0b0831e6573f104df16cd1bcc" dependencies = [ "cfg-if", "ecdsa 0.16.8", - "elliptic-curve 0.13.5", + "elliptic-curve 0.13.6", "once_cell", "sha2", "signature 2.1.0", @@ -2966,9 +2981,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" [[package]] name = "lock_api" @@ -3028,9 +3043,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "memoffset" @@ -3703,7 +3718,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.1", + "indexmap 2.0.2", ] [[package]] @@ -3976,10 +3991,11 @@ dependencies = [ [[package]] name = "prometheus-http-query" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7970fd6e91b5cb87e9a093657572a896d133879ced7752d2c7635beae29eaba0" +checksum = "6704e3a7a78545b1496524d518658005a6cc308abc90ce5fccf01891ecdc298b" dependencies = [ + "mime", "reqwest", "serde", "serde_json", @@ -4146,13 +4162,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", + "regex-automata 0.3.9", "regex-syntax 0.7.5", ] @@ -4167,9 +4183,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", @@ -4190,9 +4206,9 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64 0.21.4", "bytes", @@ -4219,6 +4235,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-native-tls", "tokio-rustls", @@ -4340,9 +4357,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.14" +version = "0.38.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f" +checksum = "f25469e9ae0f3d0047ca8b93fc56843f38e6774f0914a107ff8b41be8be8e0b7" dependencies = [ "bitflags 2.4.0", "errno", @@ -4754,9 +4771,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1b21f559e07218024e7e9f90f96f601825397de0e25420135f7f952453fed0b" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -5081,9 +5098,10 @@ dependencies = [ "async-trait", "autometrics", "axum 0.5.17", + "axum-server", "cargo-husky", "chrono", - "clap 4.4.5", + "clap 4.4.6", "confy", "criterion", "derive-getters", @@ -5183,6 +5201,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "take_mut" version = "0.2.2" @@ -5600,7 +5639,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -5996,9 +6035,9 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" [[package]] name = "waku-bindings" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cbf825d80777c831d36f030029f883c5dd0fc255d0025896c67b4ba14c5ad9a" +checksum = "c3c52764c1cde43ad4e233ad23b18bbba11f9179a6a4e9b60c660d4f22450289" dependencies = [ "aes-gcm", "base64 0.21.4", @@ -6018,9 +6057,9 @@ dependencies = [ [[package]] name = "waku-sys" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e52e75616474f736edf5483b54448d0f9726d84cf29d07f4be5aaebb47c8749" +checksum = "a23174a6c97644142b87cfb11e1b80b972a1d79b9260ec8ad2896775cbd45a99" dependencies = [ "bindgen", ] diff --git a/subgraph-radio/Cargo.toml b/subgraph-radio/Cargo.toml index 6dc2e0b..039f256 100644 --- a/subgraph-radio/Cargo.toml +++ b/subgraph-radio/Cargo.toml @@ -11,6 +11,7 @@ categories = ["network-programming", "web-programming::http-client"] [dependencies] graphcast-sdk = { workspace = true } +axum-server = "0.5.1" prost = "0.11" once_cell = "1.17" chrono = "0.4" diff --git a/subgraph-radio/src/lib.rs b/subgraph-radio/src/lib.rs index b8be013..8298d00 100644 --- a/subgraph-radio/src/lib.rs +++ b/subgraph-radio/src/lib.rs @@ -1,16 +1,21 @@ use async_graphql::{Error, ErrorExtensions}; use autometrics::autometrics; +use axum_server::Handle; +use derive_getters::Getters; use once_cell::sync::OnceCell; use std::{ collections::HashMap, + process, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, + thread::sleep, + time::Duration, }; use tokio::signal; -use tracing::error; +use tracing::{debug, error, info}; use graphcast_sdk::{ graphcast_agent::GraphcastAgent, graphql::client_network::query_network_subgraph, @@ -96,7 +101,7 @@ pub fn chainhead_block_str( } /// Graceful shutdown when receive signal -pub async fn shutdown_signal(running_program: Arc) { +pub async fn shutdown(control: ControlFlow) { let ctrl_c = async { signal::ctrl_c() .await @@ -115,12 +120,27 @@ pub async fn shutdown_signal(running_program: Arc) { let terminate = std::future::pending::<()>(); tokio::select! { - _ = ctrl_c => {println!("Shutting down server...");}, - _ = terminate => {}, + _ = ctrl_c => { + info!("Ctrl+C received! Shutting down..."); + } + _ = terminate => { + info!("SIGTERM received! Shutting down..."); + } } + // Set running boolean to false + debug!("Finish the current running processes..."); + control.running.store(false, Ordering::SeqCst); + // Signal the server to shutdown using Handle. + control + .metrics_handle + .graceful_shutdown(Some(Duration::from_secs(1))); + control + .server_handle + .graceful_shutdown(Some(Duration::from_secs(3))); - running_program.store(false, Ordering::SeqCst); - opentelemetry::global::shutdown_tracer_provider(); + sleep(Duration::from_secs(5)); + debug!("Allowed 5 seconds for graceful shutdown, force exit"); + process::exit(1); } #[derive(Debug, thiserror::Error)] @@ -160,6 +180,38 @@ impl ErrorExtensions for OperationError { } } +/// Aggregated control flow configurations +#[derive(Getters, Debug, Clone)] +pub struct ControlFlow { + running: Arc, + skip_iteration: Arc, + metrics_handle: Handle, + server_handle: Handle, +} + +impl ControlFlow { + /// Create basic control flow settings + fn new() -> Self { + let running = Arc::new(AtomicBool::new(true)); + let skip_iteration = Arc::new(AtomicBool::new(false)); + + let metrics_handle = Handle::new(); + let server_handle = Handle::new(); + + //TODO: Test for effectiveness + // let iteration_timeout = Duration::from_micros(1); + // let update_timeout = Duration::from_secs(5); + // let gossip_timeout = Duration::from_nanos(1); + + ControlFlow { + running, + skip_iteration, + metrics_handle, + server_handle, + } + } +} + #[cfg(test)] mod tests { use crate::messages::poi::PublicPoiMessage; diff --git a/subgraph-radio/src/main.rs b/subgraph-radio/src/main.rs index dbf483c..a23a9ec 100644 --- a/subgraph-radio/src/main.rs +++ b/subgraph-radio/src/main.rs @@ -23,8 +23,8 @@ async fn main() { // Initialization and pass in for static lifetime throughout the program let radio_operator = RadioOperator::new(&radio_config, agent).await; - // Start separate processes - radio_operator.prepare(receiver).await; + // Start message processes + radio_operator.message_processor(receiver).await; _ = RADIO_OPERATOR.set(radio_operator); diff --git a/subgraph-radio/src/metrics/mod.rs b/subgraph-radio/src/metrics/mod.rs index 222cd66..7ace88f 100644 --- a/subgraph-radio/src/metrics/mod.rs +++ b/subgraph-radio/src/metrics/mod.rs @@ -2,11 +2,11 @@ use autometrics::{encode_global_metrics, global_metrics_exporter}; use axum::http::StatusCode; use axum::routing::get; use axum::Router; +use axum_server::Handle; use once_cell::sync::Lazy; use prometheus::{core::Collector, Registry}; use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts}; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; + use std::{net::SocketAddr, str::FromStr}; use tracing::{debug, info}; @@ -151,22 +151,22 @@ pub async fn get_metrics() -> (StatusCode, String) { /// Run the API server as well as Prometheus and a traffic generator #[allow(dead_code)] -pub async fn handle_serve_metrics(host: String, port: u16, _running_program: Arc) { +pub async fn handle_serve_metrics(host: String, port: u16, handle: Handle) { // Set up the exporter to collect metrics let _exporter = global_metrics_exporter(); let app = Router::new().route("/metrics", get(get_metrics)); let addr = SocketAddr::from_str(&format!("{}:{}", host, port)).expect("Start Prometheus metrics"); - let server = axum::Server::bind(&addr); + // let server = axum::Server::bind(&addr); info!( address = addr.to_string(), "Prometheus Metrics port exposed" ); - server + axum_server::bind(addr) + .handle(handle) .serve(app.into_make_service()) - // .with_graceful_shutdown(shutdown_signal(running_program)) .await - .expect("Error starting Prometheus metrics service"); + .expect("Error starting Prometheus metrics service") } diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index d095789..b46d605 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -1,11 +1,6 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::Receiver, - Arc, -}; +use std::sync::{atomic::Ordering, mpsc::Receiver, Arc}; use std::time::Duration; -use derive_getters::Getters; use graphcast_sdk::{ graphcast_agent::{ message_typing::check_message_validity, @@ -19,7 +14,6 @@ use graphcast_sdk::{ use tokio::time::{interval, sleep, timeout}; use tracing::{debug, error, info, trace, warn}; -use crate::config::Config; use crate::messages::upgrade::UpgradeIntentMessage; use crate::metrics::handle_serve_metrics; use crate::operator::attestation::log_gossip_summary; @@ -36,6 +30,7 @@ use crate::{ }, operator::{attestation::ComparisonResultType, indexer_management::health_query}, }; +use crate::{config::Config, shutdown, ControlFlow}; use self::notifier::Notifier; @@ -45,50 +40,6 @@ pub mod indexer_management; pub mod notifier; pub mod operation; -/// Aggregated control flow configurations -/// Not used currently -#[derive(Getters)] -#[allow(unused)] -struct ControlFlow { - running: Arc, - skip_iteration: Arc, - iteration_timeout: Duration, - update_timeout: Duration, - gossip_timeout: Duration, - topic_update_duration: Duration, - state_update_duration: Duration, - gossip_poi_duration: Duration, - comparison_duration: Duration, -} - -impl ControlFlow { - fn new() -> Self { - let running = Arc::new(AtomicBool::new(true)); - let skip_iteration = Arc::new(AtomicBool::new(false)); - - let topic_update_duration = Duration::from_secs(600); - let state_update_duration = Duration::from_secs(15); - let gossip_poi_duration = Duration::from_secs(30); - let comparison_duration = Duration::from_secs(60); - - let iteration_timeout = Duration::from_secs(180); - let update_timeout = Duration::from_secs(10); - let gossip_timeout = Duration::from_secs(150); - - ControlFlow { - running, - skip_iteration, - iteration_timeout, - update_timeout, - gossip_timeout, - topic_update_duration, - state_update_duration, - gossip_poi_duration, - comparison_duration, - } - } -} - /// Radio operator contains all states needed for radio operations #[allow(unused)] pub struct RadioOperator { @@ -120,46 +71,41 @@ impl RadioOperator { .expect("Failed to validate the provided indexer management server endpoint"); }; let notifier = Notifier::from_config(config); + let control_flow = ControlFlow::new(); - RadioOperator { - config: config.clone(), - persisted_state, - graphcast_agent, - notifier, - control_flow: ControlFlow::new(), - } - } + // Spawn a task to gracefully shutdown + tokio::spawn(shutdown(control_flow.clone())); - /// Preparation for running the radio applications - /// Expose metrics and subscribe to graphcast topics - pub async fn prepare(&self, receiver: Receiver) { // Set up Prometheus metrics url if configured - if let Some(port) = self.config.radio_infrastructure().metrics_port { + if let Some(port) = config.radio_infrastructure().metrics_port { debug!("Initializing metrics port"); tokio::spawn(handle_serve_metrics( - self.config.radio_infrastructure().metrics_host.clone(), + config.radio_infrastructure().metrics_host.clone(), port, - self.control_flow.running.clone(), + control_flow.metrics_handle.clone(), )); } // Provide generated topics to Graphcast agent - let topics = self - .config + let topics = config .generate_topics( - &self.config.radio_infrastructure().coverage, - &self.config.graph_stack.indexer_address, + &config.radio_infrastructure().coverage, + &config.graph_stack.indexer_address, ) .await; debug!( topics = tracing::field::debug(&topics), "Found content topics for subscription", ); - self.graphcast_agent - .update_content_topics(topics.clone()) - .await; + graphcast_agent.update_content_topics(topics.clone()).await; - self.message_processor(receiver).await; + RadioOperator { + config: config.clone(), + persisted_state, + graphcast_agent, + notifier, + control_flow, + } } pub fn graphcast_agent(&self) -> &GraphcastAgent { @@ -174,11 +120,6 @@ impl RadioOperator { /// Radio operations pub async fn run(&'static self) { // Control flow - // TODO: expose to radio config for the users - let running = Arc::new(AtomicBool::new(true)); - let skip_iteration = Arc::new(AtomicBool::new(false)); - let skip_iteration_clone = skip_iteration.clone(); - let mut topic_update_interval = interval(Duration::from_secs( self.config.radio_infrastructure.topic_update_interval, )); @@ -198,24 +139,30 @@ impl RadioOperator { // Separate thread to skip a main loop iteration when hit timeout tokio::spawn(async move { tokio::time::sleep(iteration_timeout).await; - skip_iteration_clone.store(true, Ordering::SeqCst); + self.control_flow + .skip_iteration + .store(true, Ordering::SeqCst); }); // Initialize Http server with graceful shutdown if configured if self.config.radio_infrastructure().server_port.is_some() { let state_ref = &self.persisted_state; let config_cloned = self.config.clone(); - tokio::spawn(run_server(config_cloned, state_ref, running.clone())); + tokio::spawn(run_server( + config_cloned, + state_ref, + self.control_flow.server_handle.clone(), + )); } // Main loop for sending messages, can factor out // and take radio specific query and parsing for radioPayload - while running.load(Ordering::SeqCst) { + while self.control_flow.running.load(Ordering::SeqCst) { // Run event intervals sequentially by satisfication of other intervals and corresponding tick tokio::select! { _ = topic_update_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); + if self.control_flow.skip_iteration.load(Ordering::SeqCst) { + self.control_flow.skip_iteration.store(false, Ordering::SeqCst); continue; } // Update topic subscription @@ -233,8 +180,8 @@ impl RadioOperator { } }, _ = state_update_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); + if self.control_flow.skip_iteration.load(Ordering::SeqCst) { + self.control_flow.skip_iteration.store(false, Ordering::SeqCst); continue; } // Update the number of peers connected @@ -253,8 +200,8 @@ impl RadioOperator { }); }, _ = gossip_poi_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); + if self.control_flow.skip_iteration.load(Ordering::SeqCst) { + self.control_flow.skip_iteration.store(false, Ordering::SeqCst); continue; } @@ -319,8 +266,8 @@ impl RadioOperator { } }, _ = comparison_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); + if self.control_flow.skip_iteration.load(Ordering::SeqCst) { + self.control_flow.skip_iteration.store(false, Ordering::SeqCst); continue; } diff --git a/subgraph-radio/src/server/mod.rs b/subgraph-radio/src/server/mod.rs index 0081225..8c3cfcf 100644 --- a/subgraph-radio/src/server/mod.rs +++ b/subgraph-radio/src/server/mod.rs @@ -1,7 +1,8 @@ -use axum::{extract::Extension, routing::get, Router, Server}; +use axum::{extract::Extension, routing::get, Router}; +use axum_server::Handle; use std::net::SocketAddr; use std::str::FromStr; -use std::sync::atomic::AtomicBool; + use std::sync::Arc; use tracing::{debug, info}; @@ -21,11 +22,7 @@ pub mod routes; /// Set up the routes for a radio health endpoint at `/health` /// and a versioned GraphQL endpoint at `api/v1/graphql` /// This function starts a API server at the configured server_host and server_port -pub async fn run_server( - config: Config, - persisted_state: &'static PersistedState, - _running_program: Arc, -) { +pub async fn run_server(config: Config, persisted_state: &'static PersistedState, handle: Handle) { if config.radio_infrastructure().server_port.is_none() { return; } @@ -55,9 +52,10 @@ pub async fn run_server( host = tracing::field::debug(&config.radio_infrastructure().server_host), port, "Bind port to service" ); - Server::bind(&addr) + + axum_server::bind(addr) + .handle(handle) .serve(app.into_make_service()) - // .with_graceful_shutdown(shutdown_signal(running_program)) .await .expect("Error starting API service"); }