Skip to content

Commit

Permalink
Add instance_name to selected sync worker stats. (#28754)
Browse files Browse the repository at this point in the history
With moving sync worker to Usher it is useful to be able to track some metrics per instance.

GitOrigin-RevId: 7c1380c342f8fe07db06a3507e84bd7da64b20e2
  • Loading branch information
preslavle authored and Convex, Inc. committed Aug 6, 2024
1 parent 79f96cb commit e6fef13
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
40 changes: 28 additions & 12 deletions crates/sync/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use metrics::{
log_counter,
log_counter_with_labels,
log_distribution,
log_distribution_with_labels,
register_convex_counter,
register_convex_histogram,
StaticMetricLabel,
Expand Down Expand Up @@ -32,9 +33,9 @@ pub fn connect_timer() -> StatusTimer {
register_convex_histogram!(
SYNC_HANDLE_MESSAGE_SECONDS,
"Time to handle a websocket message",
&["status", "endpoint"]
&["status", "endpoint", "instance_name"]
);
pub fn handle_message_timer(message: &ClientMessage) -> StatusTimer {
pub fn handle_message_timer(message: &ClientMessage, instance_name: String) -> StatusTimer {
let mut timer = StatusTimer::new(&SYNC_HANDLE_MESSAGE_SECONDS);
let request_name = match message {
ClientMessage::Authenticate { .. } => "Authenticate",
Expand All @@ -45,25 +46,30 @@ pub fn handle_message_timer(message: &ClientMessage) -> StatusTimer {
ClientMessage::Event { .. } => "Event",
};
timer.add_label(StaticMetricLabel::new("endpoint", request_name.to_owned()));
timer.add_label(StaticMetricLabel::new("instance_name", instance_name));
timer
}

register_convex_histogram!(
SYNC_UPDATE_QUERIES_SECONDS,
"Time to update queries",
&STATUS_LABEL
&[STATUS_LABEL[0], "instance_name"]
);
pub fn update_queries_timer() -> StatusTimer {
StatusTimer::new(&SYNC_UPDATE_QUERIES_SECONDS)
pub fn update_queries_timer(instance_name: String) -> StatusTimer {
let mut timer = StatusTimer::new(&SYNC_UPDATE_QUERIES_SECONDS);
timer.add_label(StaticMetricLabel::new("instance_name", instance_name));
timer
}

register_convex_histogram!(
SYNC_MUTATION_QUEUE_SECONDS,
"Time between a mutation entering and exiting the single threaded sync worker queue",
&STATUS_LABEL
&[STATUS_LABEL[0], "instance_name"]
);
pub fn mutation_queue_timer() -> StatusTimer {
StatusTimer::new(&SYNC_MUTATION_QUEUE_SECONDS)
pub fn mutation_queue_timer(instance_name: String) -> StatusTimer {
let mut timer = StatusTimer::new(&SYNC_MUTATION_QUEUE_SECONDS);
timer.add_label(StaticMetricLabel::new("instance_name", instance_name));
timer
}

register_convex_counter!(SYNC_QUERY_FAILED_TOTAL, "Number of query failures");
Expand Down Expand Up @@ -108,17 +114,27 @@ pub fn log_connect(last_close_reason: String, connection_count: u32) {
register_convex_histogram!(
SYNC_LINEARIZABILITY_DELAY_SECONDS,
"How far behind the current backend is behind what the client has observed",
&["instance_name"]
);
pub fn log_linearizability_violation(delay_secs: f64) {
log_distribution(&SYNC_LINEARIZABILITY_DELAY_SECONDS, delay_secs);
pub fn log_linearizability_violation(delay_secs: f64, instance_name: String) {
log_distribution_with_labels(
&SYNC_LINEARIZABILITY_DELAY_SECONDS,
delay_secs,
vec![StaticMetricLabel::new("instance_name", instance_name)],
);
}

register_convex_histogram!(
SYNC_PROCESS_CLIENT_MESSAGE_SECONDS,
"Delay between receiving a client message over the web socket and processing it",
&["instance_name"]
);
pub fn log_process_client_message_delay(delay: Duration) {
log_distribution(&SYNC_PROCESS_CLIENT_MESSAGE_SECONDS, delay.as_secs_f64());
pub fn log_process_client_message_delay(delay: Duration, instance_name: String) {
log_distribution_with_labels(
&SYNC_PROCESS_CLIENT_MESSAGE_SECONDS,
delay.as_secs_f64(),
vec![StaticMetricLabel::new("instance_name", instance_name)],
);
}

register_convex_histogram!(
Expand Down
11 changes: 7 additions & 4 deletions crates/sync/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ impl<RT: Runtime> SyncWorker<RT> {
};
self.handle_message(message).await?;
let delay = self.rt.monotonic_now() - received_time;
metrics::log_process_client_message_delay(delay);
metrics::log_process_client_message_delay(
delay, self.host.instance_name.clone(),
);
None
},
// TODO(presley): If I swap this with futures below, tests break.
Expand Down Expand Up @@ -409,7 +411,7 @@ impl<RT: Runtime> SyncWorker<RT> {
}

async fn handle_message(&mut self, message: ClientMessage) -> anyhow::Result<()> {
let timer = metrics::handle_message_timer(&message);
let timer = metrics::handle_message_timer(&message, self.host.instance_name.clone());
match message {
ClientMessage::Connect {
session_id,
Expand All @@ -435,6 +437,7 @@ impl<RT: Runtime> SyncWorker<RT> {
// but lets keep it as server one for now.
metrics::log_linearizability_violation(
max_observed_timestamp.secs_since_f64(latest_timestamp),
self.host.instance_name.clone(),
);
anyhow::bail!(
"Client has observed a timestamp {max_observed_timestamp:?} ahead of \
Expand Down Expand Up @@ -481,7 +484,7 @@ impl<RT: Runtime> SyncWorker<RT> {
});
let rt = self.rt.clone();
let client_version = self.config.client_version.clone();
let timer = mutation_queue_timer();
let timer = mutation_queue_timer(self.host.instance_name.clone());
let api = self.api.clone();
let host = self.host.clone();
let caller = FunctionCaller::SyncWorker(client_version);
Expand Down Expand Up @@ -669,7 +672,7 @@ impl<RT: Runtime> SyncWorker<RT> {
new_ts: Timestamp,
subscriptions_client: Arc<dyn SubscriptionClient>,
) -> anyhow::Result<impl Future<Output = anyhow::Result<TransitionState>>> {
let timer = metrics::update_queries_timer();
let timer = metrics::update_queries_timer(self.host.instance_name.clone());
let current_version = self.state.current_version();

let (modifications, new_query_version, pending_identity, new_identity_version) =
Expand Down

0 comments on commit e6fef13

Please sign in to comment.