diff --git a/crates/sync/src/metrics.rs b/crates/sync/src/metrics.rs index fb396f0f..fe2df1fa 100644 --- a/crates/sync/src/metrics.rs +++ b/crates/sync/src/metrics.rs @@ -4,6 +4,7 @@ use metrics::{ log_counter, log_counter_with_labels, log_distribution, + log_distribution_with_labels, register_convex_counter, register_convex_histogram, StaticMetricLabel, @@ -32,9 +33,9 @@ pub fn connect_timer() -> StatusTimer { register_convex_histogram!( SYNC_HANDLE_MESSAGE_SECONDS, "Time to handle a websocket message", - &["status", "endpoint"] + &["status", "endpoint", "instance_name"] ); -pub fn handle_message_timer(message: &ClientMessage) -> StatusTimer { +pub fn handle_message_timer(message: &ClientMessage, instance_name: String) -> StatusTimer { let mut timer = StatusTimer::new(&SYNC_HANDLE_MESSAGE_SECONDS); let request_name = match message { ClientMessage::Authenticate { .. } => "Authenticate", @@ -45,25 +46,30 @@ pub fn handle_message_timer(message: &ClientMessage) -> StatusTimer { ClientMessage::Event { .. } => "Event", }; timer.add_label(StaticMetricLabel::new("endpoint", request_name.to_owned())); + timer.add_label(StaticMetricLabel::new("instance_name", instance_name)); timer } register_convex_histogram!( SYNC_UPDATE_QUERIES_SECONDS, "Time to update queries", - &STATUS_LABEL + &[STATUS_LABEL[0], "instance_name"] ); -pub fn update_queries_timer() -> StatusTimer { - StatusTimer::new(&SYNC_UPDATE_QUERIES_SECONDS) +pub fn update_queries_timer(instance_name: String) -> StatusTimer { + let mut timer = StatusTimer::new(&SYNC_UPDATE_QUERIES_SECONDS); + timer.add_label(StaticMetricLabel::new("instance_name", instance_name)); + timer } register_convex_histogram!( SYNC_MUTATION_QUEUE_SECONDS, "Time between a mutation entering and exiting the single threaded sync worker queue", - &STATUS_LABEL + &[STATUS_LABEL[0], "instance_name"] ); -pub fn mutation_queue_timer() -> StatusTimer { - StatusTimer::new(&SYNC_MUTATION_QUEUE_SECONDS) +pub fn mutation_queue_timer(instance_name: String) -> StatusTimer { + let mut timer = StatusTimer::new(&SYNC_MUTATION_QUEUE_SECONDS); + timer.add_label(StaticMetricLabel::new("instance_name", instance_name)); + timer } register_convex_counter!(SYNC_QUERY_FAILED_TOTAL, "Number of query failures"); @@ -108,17 +114,27 @@ pub fn log_connect(last_close_reason: String, connection_count: u32) { register_convex_histogram!( SYNC_LINEARIZABILITY_DELAY_SECONDS, "How far behind the current backend is behind what the client has observed", + &["instance_name"] ); -pub fn log_linearizability_violation(delay_secs: f64) { - log_distribution(&SYNC_LINEARIZABILITY_DELAY_SECONDS, delay_secs); +pub fn log_linearizability_violation(delay_secs: f64, instance_name: String) { + log_distribution_with_labels( + &SYNC_LINEARIZABILITY_DELAY_SECONDS, + delay_secs, + vec![StaticMetricLabel::new("instance_name", instance_name)], + ); } register_convex_histogram!( SYNC_PROCESS_CLIENT_MESSAGE_SECONDS, "Delay between receiving a client message over the web socket and processing it", + &["instance_name"] ); -pub fn log_process_client_message_delay(delay: Duration) { - log_distribution(&SYNC_PROCESS_CLIENT_MESSAGE_SECONDS, delay.as_secs_f64()); +pub fn log_process_client_message_delay(delay: Duration, instance_name: String) { + log_distribution_with_labels( + &SYNC_PROCESS_CLIENT_MESSAGE_SECONDS, + delay.as_secs_f64(), + vec![StaticMetricLabel::new("instance_name", instance_name)], + ); } register_convex_histogram!( diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index 36001a15..b2a6ff96 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -298,7 +298,9 @@ impl SyncWorker { }; self.handle_message(message).await?; let delay = self.rt.monotonic_now() - received_time; - metrics::log_process_client_message_delay(delay); + metrics::log_process_client_message_delay( + delay, self.host.instance_name.clone(), + ); None }, // TODO(presley): If I swap this with futures below, tests break. @@ -409,7 +411,7 @@ impl SyncWorker { } async fn handle_message(&mut self, message: ClientMessage) -> anyhow::Result<()> { - let timer = metrics::handle_message_timer(&message); + let timer = metrics::handle_message_timer(&message, self.host.instance_name.clone()); match message { ClientMessage::Connect { session_id, @@ -435,6 +437,7 @@ impl SyncWorker { // but lets keep it as server one for now. metrics::log_linearizability_violation( max_observed_timestamp.secs_since_f64(latest_timestamp), + self.host.instance_name.clone(), ); anyhow::bail!( "Client has observed a timestamp {max_observed_timestamp:?} ahead of \ @@ -481,7 +484,7 @@ impl SyncWorker { }); let rt = self.rt.clone(); let client_version = self.config.client_version.clone(); - let timer = mutation_queue_timer(); + let timer = mutation_queue_timer(self.host.instance_name.clone()); let api = self.api.clone(); let host = self.host.clone(); let caller = FunctionCaller::SyncWorker(client_version); @@ -669,7 +672,7 @@ impl SyncWorker { new_ts: Timestamp, subscriptions_client: Arc, ) -> anyhow::Result>> { - let timer = metrics::update_queries_timer(); + let timer = metrics::update_queries_timer(self.host.instance_name.clone()); let current_version = self.state.current_version(); let (modifications, new_query_version, pending_identity, new_identity_version) =