Skip to content

Commit

Permalink
endpoint health (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Apr 8, 2024
1 parent f6f076d commit cdbdd9b
Show file tree
Hide file tree
Showing 11 changed files with 763 additions and 190 deletions.
1 change: 1 addition & 0 deletions benches/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ fn config() -> Config {
format!("ws://{}", SERVER_TWO_ENDPOINT),
],
shuffle_endpoints: false,
health_check: None,
}),
server: Some(ServerConfig {
listen_address: SUBWAY_SERVER_ADDR.to_string(),
Expand Down
8 changes: 8 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ extensions:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: system_health
response: # response contains { isSyncing: false }
!contains
- - isSyncing
- !eq false
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
Expand Down
12 changes: 12 additions & 0 deletions eth_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@ extensions:
client:
endpoints:
- wss://eth-rpc-karura-testnet.aca-staging.network
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: net_health # eth-rpc-adapter bodhijs
response: # response contains { isHealthy: true, isRPCOK: true }
!contains
- - isHealthy
- !eq true
- - isRPCOK
- !eq true
# health_method: eth_syncing # eth node
# response: !eq false
event_bus:
eth_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
Expand Down
169 changes: 169 additions & 0 deletions src/extensions/client/endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use super::health::{Event, Health};
use crate::{
extensions::client::{get_backoff_time, HealthCheckConfig},
utils::errors,
};
use jsonrpsee::{
async_client::Client,
core::client::{ClientT, Subscription, SubscriptionClientT},
ws_client::WsClientBuilder,
};
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

pub struct Endpoint {
url: String,
health: Arc<Health>,
client_rx: tokio::sync::watch::Receiver<Option<Arc<Client>>>,
on_client_ready: Arc<tokio::sync::Notify>,
background_tasks: Vec<tokio::task::JoinHandle<()>>,
}

impl Drop for Endpoint {
fn drop(&mut self) {
self.background_tasks.drain(..).for_each(|handle| handle.abort());
}
}

impl Endpoint {
pub fn new(
url: String,
request_timeout: Option<Duration>,
connection_timeout: Option<Duration>,
health_config: HealthCheckConfig,
) -> Self {
let (client_tx, client_rx) = tokio::sync::watch::channel(None);
let on_client_ready = Arc::new(tokio::sync::Notify::new());
let health = Arc::new(Health::new(url.clone(), health_config));

let url_ = url.clone();
let health_ = health.clone();
let on_client_ready_ = on_client_ready.clone();

// This task will try to connect to the endpoint and keep the connection alive
let connection_task = tokio::spawn(async move {
let connect_backoff_counter = Arc::new(AtomicU32::new(0));

loop {
tracing::info!("Connecting endpoint: {url_}");

let client = WsClientBuilder::default()
.request_timeout(request_timeout.unwrap_or(Duration::from_secs(30)))
.connection_timeout(connection_timeout.unwrap_or(Duration::from_secs(30)))
.max_buffer_capacity_per_subscription(2048)
.max_concurrent_requests(2048)
.max_response_size(20 * 1024 * 1024)
.build(&url_);

match client.await {
Ok(client) => {
let client = Arc::new(client);
health_.update(Event::ConnectionSuccessful);
_ = client_tx.send(Some(client.clone()));
on_client_ready_.notify_waiters();
tracing::info!("Endpoint connected: {url_}");
connect_backoff_counter.store(0, Ordering::Relaxed);
client.on_disconnect().await;
}
Err(err) => {
health_.on_error(&err);
_ = client_tx.send(None);
tracing::warn!("Unable to connect to endpoint: {url_} error: {err}");
tokio::time::sleep(get_backoff_time(&connect_backoff_counter)).await;
}
}
// Wait a second before trying to reconnect
tokio::time::sleep(Duration::from_secs(1)).await;
}
});

// This task will check the health of the endpoint and update the health score
let health_checker = Health::monitor(health.clone(), client_rx.clone(), on_client_ready.clone());

Self {
url,
health,
client_rx,
on_client_ready,
background_tasks: vec![connection_task, health_checker],
}
}

pub fn url(&self) -> &str {
&self.url
}

pub fn health(&self) -> &Health {
self.health.as_ref()
}

pub async fn connected(&self) {
if self.client_rx.borrow().is_some() {
return;
}
self.on_client_ready.notified().await;
}

pub async fn request(
&self,
method: &str,
params: Vec<serde_json::Value>,
timeout: Duration,
) -> Result<serde_json::Value, jsonrpsee::core::Error> {
let client = self
.client_rx
.borrow()
.clone()
.ok_or(errors::failed("client not connected"))?;

match tokio::time::timeout(timeout, client.request(method, params.clone())).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => {
self.health.on_error(&err);
Err(err)
}
Err(_) => {
tracing::error!("request timed out method: {method} params: {params:?}");
self.health.on_error(&jsonrpsee::core::Error::RequestTimeout);
Err(jsonrpsee::core::Error::RequestTimeout)
}
}
}

pub async fn subscribe(
&self,
subscribe_method: &str,
params: Vec<serde_json::Value>,
unsubscribe_method: &str,
timeout: Duration,
) -> Result<Subscription<serde_json::Value>, jsonrpsee::core::Error> {
let client = self
.client_rx
.borrow()
.clone()
.ok_or(errors::failed("client not connected"))?;

match tokio::time::timeout(
timeout,
client.subscribe(subscribe_method, params.clone(), unsubscribe_method),
)
.await
{
Ok(Ok(response)) => Ok(response),
Ok(Err(err)) => {
self.health.on_error(&err);
Err(err)
}
Err(_) => {
tracing::error!("subscribe timed out subscribe: {subscribe_method} params: {params:?}");
self.health.on_error(&jsonrpsee::core::Error::RequestTimeout);
Err(jsonrpsee::core::Error::RequestTimeout)
}
}
}
}
159 changes: 159 additions & 0 deletions src/extensions/client/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use crate::extensions::client::HealthCheckConfig;
use jsonrpsee::{async_client::Client, core::client::ClientT};
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

#[derive(Debug)]
pub enum Event {
ResponseOk,
SlowResponse,
RequestTimeout,
ConnectionSuccessful,
ConnectionFailed,
StaleChain,
}

impl Event {
pub fn update_score(&self, current: u32) -> u32 {
u32::min(
match self {
Event::ResponseOk => current.saturating_add(2),
Event::SlowResponse => current.saturating_sub(5),
Event::RequestTimeout | Event::ConnectionFailed | Event::StaleChain => 0,
Event::ConnectionSuccessful => MAX_SCORE / 5 * 4, // 80% of max score
},
MAX_SCORE,
)
}
}

#[derive(Debug, Default)]
pub struct Health {
url: String,
config: HealthCheckConfig,
score: AtomicU32,
unhealthy: tokio::sync::Notify,
}

const MAX_SCORE: u32 = 100;
const THRESHOLD: u32 = MAX_SCORE / 2;

impl Health {
pub fn new(url: String, config: HealthCheckConfig) -> Self {
Self {
url,
config,
score: AtomicU32::new(0),
unhealthy: tokio::sync::Notify::new(),
}
}

pub fn score(&self) -> u32 {
self.score.load(Ordering::Relaxed)
}

pub fn update(&self, event: Event) {
let current_score = self.score.load(Ordering::Relaxed);
let new_score = event.update_score(current_score);
if new_score == current_score {
return;
}
self.score.store(new_score, Ordering::Relaxed);
log::trace!(
"Endpoint {:?} score updated from: {current_score} to: {new_score}",
self.url
);

// Notify waiters if the score has dropped below the threshold
if current_score >= THRESHOLD && new_score < THRESHOLD {
log::warn!("Endpoint {:?} became unhealthy", self.url);
self.unhealthy.notify_waiters();
}
}

pub fn on_error(&self, err: &jsonrpsee::core::Error) {
log::warn!("Endpoint {:?} responded with error: {err:?}", self.url);
match err {
jsonrpsee::core::Error::RequestTimeout => {
self.update(Event::RequestTimeout);
}
jsonrpsee::core::Error::Transport(_)
| jsonrpsee::core::Error::RestartNeeded(_)
| jsonrpsee::core::Error::MaxSlotsExceeded => {
self.update(Event::ConnectionFailed);
}
_ => {}
};
}

pub async fn unhealthy(&self) {
self.unhealthy.notified().await;
}
}

impl Health {
pub fn monitor(
health: Arc<Health>,
client_rx_: tokio::sync::watch::Receiver<Option<Arc<Client>>>,
on_client_ready: Arc<tokio::sync::Notify>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
// no health method
if health.config.health_method.is_none() {
return;
}

// Wait for the client to be ready before starting the health check
on_client_ready.notified().await;

let method_name = health.config.health_method.as_ref().expect("checked above");
let health_response = health.config.response.clone();
let interval = Duration::from_secs(health.config.interval_sec);
let healthy_response_time = Duration::from_millis(health.config.healthy_response_time_ms);

let client = match client_rx_.borrow().clone() {
Some(client) => client,
None => return,
};

loop {
// Wait for the next interval
tokio::time::sleep(interval).await;

let request_start = std::time::Instant::now();
match client
.request::<serde_json::Value, Vec<serde_json::Value>>(method_name, vec![])
.await
{
Ok(response) => {
let duration = request_start.elapsed();

// Check response
if let Some(ref health_response) = health_response {
if !health_response.validate(&response) {
health.update(Event::StaleChain);
continue;
}
}

// Check response time
if duration > healthy_response_time {
health.update(Event::SlowResponse);
continue;
}

health.update(Event::ResponseOk);
}
Err(err) => {
health.on_error(&err);
}
}
}
})
}
}
10 changes: 10 additions & 0 deletions src/extensions/client/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ pub async fn dummy_server() -> (
(addr, handle, rx, sub_rx)
}

pub async fn dummy_server_extend(extend: Box<dyn FnOnce(&mut TestServerBuilder)>) -> (SocketAddr, ServerHandle) {
let mut builder = TestServerBuilder::new();

extend(&mut builder);

let (addr, handle) = builder.build().await;

(addr, handle)
}

pub enum SinkTask {
Sleep(u64),
Send(JsonValue),
Expand Down
Loading

0 comments on commit cdbdd9b

Please sign in to comment.