From 95970f53c0c6b75590e80c99753a5b7a7b81961f Mon Sep 17 00:00:00 2001 From: axiomatic-aardvark Date: Thu, 18 Apr 2024 13:36:24 +0300 Subject: [PATCH] feat: differentiate between offline and allocated subgraphs --- ...6d48ae19ada62770aaf97992173bc15f0beef.json | 2 +- ...2736861030b7ef4e35d94f7d8a5051f5ba437.json | 4 +- ...c9bd2e5062d9d8fe65112b7a884d35e5d7267.json | 2 +- ...1502ea6fe1ba26800bee47af5b2e2da6ce42a.json | 4 +- ...0e63673a6d1416bd8907ac5b44b25f46bcebe.json | 2 +- ...59c3d38fd1db23d437657334120749f9e9ea2.json | 2 +- ...74af216eb2e355347672754d3be5f46ab6814.json | 4 +- ...d8c5975fcb34fa7f351341907b25dc8167b85.json | 4 +- ...9b2e9199ba2c981bf19caf4b367658043ce40.json | 4 +- ...a07b24c9a1eb36e0989203c2d83b6db9cd191.json | 2 +- grafana.json | 5 +- subgraph-radio/benches/attestations.rs | 3 +- subgraph-radio/src/database/mod.rs | 75 +++++++++++++------ subgraph-radio/src/database/test.rs | 6 +- subgraph-radio/src/messages/poi.rs | 20 +++-- subgraph-radio/src/metrics/mod.rs | 14 ++-- subgraph-radio/src/operator/attestation.rs | 33 ++++++-- subgraph-radio/src/operator/mod.rs | 38 ++++++++-- subgraph-radio/src/operator/operation.rs | 22 ++++-- subgraph-radio/src/server/model/mod.rs | 53 +++++++++---- 20 files changed, 215 insertions(+), 84 deletions(-) diff --git a/.sqlx/query-1598168faa0e38d2bd87a7aba3a6d48ae19ada62770aaf97992173bc15f0beef.json b/.sqlx/query-1598168faa0e38d2bd87a7aba3a6d48ae19ada62770aaf97992173bc15f0beef.json index e023b9a..53782ba 100644 --- a/.sqlx/query-1598168faa0e38d2bd87a7aba3a6d48ae19ada62770aaf97992173bc15f0beef.json +++ b/.sqlx/query-1598168faa0e38d2bd87a7aba3a6d48ae19ada62770aaf97992173bc15f0beef.json @@ -11,7 +11,7 @@ { "name": "nonce", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "graph_account", diff --git a/.sqlx/query-1da94d33f4debe43ccbacb3e3542736861030b7ef4e35d94f7d8a5051f5ba437.json b/.sqlx/query-1da94d33f4debe43ccbacb3e3542736861030b7ef4e35d94f7d8a5051f5ba437.json index 0a08e84..3e63684 100644 --- a/.sqlx/query-1da94d33f4debe43ccbacb3e3542736861030b7ef4e35d94f7d8a5051f5ba437.json +++ b/.sqlx/query-1da94d33f4debe43ccbacb3e3542736861030b7ef4e35d94f7d8a5051f5ba437.json @@ -11,7 +11,7 @@ { "name": "nonce", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "graph_account", @@ -31,7 +31,7 @@ { "name": "block_number", "ordinal": 5, - "type_info": "Int64" + "type_info": "Text" }, { "name": "block_hash", diff --git a/.sqlx/query-256874918132db5582b4f08f098c9bd2e5062d9d8fe65112b7a884d35e5d7267.json b/.sqlx/query-256874918132db5582b4f08f098c9bd2e5062d9d8fe65112b7a884d35e5d7267.json index 974a232..5c9157b 100644 --- a/.sqlx/query-256874918132db5582b4f08f098c9bd2e5062d9d8fe65112b7a884d35e5d7267.json +++ b/.sqlx/query-256874918132db5582b4f08f098c9bd2e5062d9d8fe65112b7a884d35e5d7267.json @@ -11,7 +11,7 @@ { "name": "block_number", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "result_type", diff --git a/.sqlx/query-5db100e45ba48233b2bccd892181502ea6fe1ba26800bee47af5b2e2da6ce42a.json b/.sqlx/query-5db100e45ba48233b2bccd892181502ea6fe1ba26800bee47af5b2e2da6ce42a.json index 2a21f76..6025cdf 100644 --- a/.sqlx/query-5db100e45ba48233b2bccd892181502ea6fe1ba26800bee47af5b2e2da6ce42a.json +++ b/.sqlx/query-5db100e45ba48233b2bccd892181502ea6fe1ba26800bee47af5b2e2da6ce42a.json @@ -11,7 +11,7 @@ { "name": "block_number", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "ppoi", @@ -21,7 +21,7 @@ { "name": "stake_weight", "ordinal": 3, - "type_info": "Int64" + "type_info": "Text" }, { "name": "sender_group_hash", diff --git a/.sqlx/query-6c712ee2fe2bcf46090649b1d950e63673a6d1416bd8907ac5b44b25f46bcebe.json b/.sqlx/query-6c712ee2fe2bcf46090649b1d950e63673a6d1416bd8907ac5b44b25f46bcebe.json index cc113c7..cb630bd 100644 --- a/.sqlx/query-6c712ee2fe2bcf46090649b1d950e63673a6d1416bd8907ac5b44b25f46bcebe.json +++ b/.sqlx/query-6c712ee2fe2bcf46090649b1d950e63673a6d1416bd8907ac5b44b25f46bcebe.json @@ -11,7 +11,7 @@ { "name": "block_number", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "result_type", diff --git a/.sqlx/query-7e8a55b84c78053925be21e17e159c3d38fd1db23d437657334120749f9e9ea2.json b/.sqlx/query-7e8a55b84c78053925be21e17e159c3d38fd1db23d437657334120749f9e9ea2.json index 3457341..3637e41 100644 --- a/.sqlx/query-7e8a55b84c78053925be21e17e159c3d38fd1db23d437657334120749f9e9ea2.json +++ b/.sqlx/query-7e8a55b84c78053925be21e17e159c3d38fd1db23d437657334120749f9e9ea2.json @@ -11,7 +11,7 @@ { "name": "nonce", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "graph_account", diff --git a/.sqlx/query-9307257bf723ea3b87e55d86c0974af216eb2e355347672754d3be5f46ab6814.json b/.sqlx/query-9307257bf723ea3b87e55d86c0974af216eb2e355347672754d3be5f46ab6814.json index f54d9db..e087a1e 100644 --- a/.sqlx/query-9307257bf723ea3b87e55d86c0974af216eb2e355347672754d3be5f46ab6814.json +++ b/.sqlx/query-9307257bf723ea3b87e55d86c0974af216eb2e355347672754d3be5f46ab6814.json @@ -11,7 +11,7 @@ { "name": "nonce", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "graph_account", @@ -31,7 +31,7 @@ { "name": "block_number", "ordinal": 5, - "type_info": "Int64" + "type_info": "Text" }, { "name": "block_hash", diff --git a/.sqlx/query-befde8748e7ffb210f38cf47d0dd8c5975fcb34fa7f351341907b25dc8167b85.json b/.sqlx/query-befde8748e7ffb210f38cf47d0dd8c5975fcb34fa7f351341907b25dc8167b85.json index c266b53..c71cedc 100644 --- a/.sqlx/query-befde8748e7ffb210f38cf47d0dd8c5975fcb34fa7f351341907b25dc8167b85.json +++ b/.sqlx/query-befde8748e7ffb210f38cf47d0dd8c5975fcb34fa7f351341907b25dc8167b85.json @@ -11,7 +11,7 @@ { "name": "block_number", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "ppoi", @@ -21,7 +21,7 @@ { "name": "stake_weight", "ordinal": 3, - "type_info": "Int64" + "type_info": "Text" }, { "name": "sender_group_hash", diff --git a/.sqlx/query-c768d9f6c074652c3bbf7c150f79b2e9199ba2c981bf19caf4b367658043ce40.json b/.sqlx/query-c768d9f6c074652c3bbf7c150f79b2e9199ba2c981bf19caf4b367658043ce40.json index 7aac38c..498f93a 100644 --- a/.sqlx/query-c768d9f6c074652c3bbf7c150f79b2e9199ba2c981bf19caf4b367658043ce40.json +++ b/.sqlx/query-c768d9f6c074652c3bbf7c150f79b2e9199ba2c981bf19caf4b367658043ce40.json @@ -11,7 +11,7 @@ { "name": "block_number", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "ppoi", @@ -21,7 +21,7 @@ { "name": "stake_weight", "ordinal": 3, - "type_info": "Int64" + "type_info": "Text" }, { "name": "sender_group_hash", diff --git a/.sqlx/query-fb4a1ca07c9cda5c7ddaaf9be55a07b24c9a1eb36e0989203c2d83b6db9cd191.json b/.sqlx/query-fb4a1ca07c9cda5c7ddaaf9be55a07b24c9a1eb36e0989203c2d83b6db9cd191.json index d359e22..0be1840 100644 --- a/.sqlx/query-fb4a1ca07c9cda5c7ddaaf9be55a07b24c9a1eb36e0989203c2d83b6db9cd191.json +++ b/.sqlx/query-fb4a1ca07c9cda5c7ddaaf9be55a07b24c9a1eb36e0989203c2d83b6db9cd191.json @@ -11,7 +11,7 @@ { "name": "block_number", "ordinal": 1, - "type_info": "Int64" + "type_info": "Text" }, { "name": "result_type", diff --git a/grafana.json b/grafana.json index 2ccd40d..ad9cb49 100644 --- a/grafana.json +++ b/grafana.json @@ -494,7 +494,7 @@ "datasource": "${DS_SUBGRAPHRADIOGRAPHQL}", "endTimePath": "endTime", "groupBy": "null", - "queryText": "query {\n data:comparisonRatio{\n deployment\n blockNumber\n senderRatio\n stakeRatio\n }\n}", + "queryText": "query {\n data:comparisonRatio{\n deployment\n blockNumber\n senderRatio\n stakeRatio\n allocated\n}\n}", "refId": "A", "timePath": "Time" } @@ -512,7 +512,8 @@ "_blockNumber": "Block Number", "_deployment": "Deployment", "_senderRatio": "Count Ratio", - "_stakeRatio": "Stake Ratio" + "_stakeRatio": "Stake Ratio", + "_allocated": "Allocated" } } } diff --git a/subgraph-radio/benches/attestations.rs b/subgraph-radio/benches/attestations.rs index 56fb90e..670f8c5 100644 --- a/subgraph-radio/benches/attestations.rs +++ b/subgraph-radio/benches/attestations.rs @@ -7,7 +7,7 @@ mod attestation { use criterion::{black_box, criterion_group, Criterion}; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; use sqlx::SqlitePool; - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use subgraph_radio::{ database::{insert_local_attestation, insert_remote_ppoi_message}, messages::poi::PublicPoiMessage, @@ -75,6 +75,7 @@ mod attestation { 42, black_box(&remote_attestations.clone()), "my-awesome-hash", + HashSet::new(), ) }) }); diff --git a/subgraph-radio/src/database/mod.rs b/subgraph-radio/src/database/mod.rs index 1a718ec..7f5666a 100644 --- a/subgraph-radio/src/database/mod.rs +++ b/subgraph-radio/src/database/mod.rs @@ -74,9 +74,15 @@ pub async fn get_local_attestation( let attestation = Attestation { identifier: data.identifier, - block_number: data.block_number as u64, + block_number: data + .block_number + .parse::() + .expect("Failed to parse as u64"), ppoi: data.ppoi, - stake_weight: data.stake_weight as u64, + stake_weight: data + .stake_weight + .parse::() + .expect("Failed to parse as u64"), senders, sender_group_hash: data.sender_group_hash, timestamp: timestamps, @@ -119,9 +125,15 @@ pub async fn get_local_attestations_by_identifier( full_attestations.push(Attestation { identifier: att.identifier, - block_number: att.block_number as u64, + block_number: att + .block_number + .parse::() + .expect("Failed to parse as u64"), ppoi: att.ppoi, - stake_weight: att.stake_weight as u64, + stake_weight: att + .stake_weight + .parse::() + .expect("Failed to parse as u64"), senders, sender_group_hash: att.sender_group_hash, timestamp: timestamps, @@ -158,9 +170,15 @@ pub async fn get_local_attestations(pool: &SqlitePool) -> Result() + .expect("Failed to parse as u64"), ppoi: record.ppoi, - stake_weight: record.stake_weight as u64, + stake_weight: record + .stake_weight + .parse::() + .expect("Failed to parse as u64"), sender_group_hash: record.sender_group_hash, senders, timestamp: timestamps, @@ -251,15 +269,18 @@ pub async fn get_remote_ppoi_messages( .into_iter() .map(|row| GraphcastMessage { identifier: row.identifier.clone(), - nonce: row.nonce as u64, + nonce: row.nonce.parse::().expect("Failed to parse as u64"), graph_account: row.graph_account.clone(), signature: row.signature, payload: PublicPoiMessage { identifier: row.identifier, content: row.content, - nonce: row.nonce as u64, + nonce: row.nonce.parse::().expect("Failed to parse as u64"), network: row.network, - block_number: row.block_number as u64, + block_number: row + .block_number + .parse::() + .expect("Failed to parse as u64"), block_hash: row.block_hash, graph_account: row.graph_account, }, @@ -288,15 +309,18 @@ pub async fn get_remote_ppoi_messages_by_identifier( .into_iter() .map(|row| GraphcastMessage { identifier: row.identifier.clone(), - nonce: row.nonce as u64, + nonce: row.nonce.parse::().expect("Failed to parse as u64"), graph_account: row.graph_account.clone(), signature: row.signature, payload: PublicPoiMessage { identifier: row.identifier, content: row.content, - nonce: row.nonce as u64, + nonce: row.nonce.parse::().expect("Failed to parse as u64"), network: row.network, - block_number: row.block_number as u64, + block_number: row + .block_number + .parse::() + .expect("Failed to parse as u64"), block_hash: row.block_hash, graph_account: row.graph_account, }, @@ -381,13 +405,13 @@ pub async fn get_upgrade_intent_message( deployment: msg.deployment.clone(), subgraph_id: msg.subgraph_id, new_hash: msg.new_hash, - nonce: msg.nonce as u64, + nonce: msg.nonce.parse::().expect("Failed to parse as u64"), graph_account: msg.graph_account.clone(), }; let graphcast_message = GraphcastMessage::new( msg.deployment, - msg.nonce as u64, + msg.nonce.parse::().expect("Failed to parse as u64"), msg.graph_account, payload, msg.signature, @@ -420,13 +444,13 @@ pub async fn get_upgrade_intent_message_by_id( deployment: msg.deployment.clone(), subgraph_id: msg.subgraph_id, new_hash: msg.new_hash, - nonce: msg.nonce as u64, + nonce: msg.nonce.parse::().expect("Failed to parse as u64"), graph_account: msg.graph_account.clone(), }; let graphcast_message = GraphcastMessage::new( msg.deployment, - msg.nonce as u64, + msg.nonce.parse::().expect("Failed to parse as u64"), msg.graph_account, payload, msg.signature, @@ -456,13 +480,13 @@ pub async fn get_upgrade_intent_messages( deployment: msg.deployment.clone(), subgraph_id: msg.subgraph_id, new_hash: msg.new_hash, - nonce: msg.nonce as u64, + nonce: msg.nonce.parse::().expect("Failed to parse as u64"), graph_account: msg.graph_account.clone(), }; let graphcast_message = GraphcastMessage::new( msg.deployment, - msg.nonce as u64, + msg.nonce.parse::().expect("Failed to parse as u64"), msg.graph_account, payload, msg.signature, @@ -622,7 +646,10 @@ pub async fn get_comparison_results( results.push(ComparisonResult { deployment: row.deployment, - block_number: row.block_number as u64, + block_number: row + .block_number + .parse::() + .expect("Failed to parse as u64"), result_type: ComparisonResultType::from_str(&row.result_type)?, local_attestation, attestations, @@ -659,7 +686,10 @@ pub async fn get_comparison_results_by_type( .map_err(DatabaseError::ParseError) .map(|result_type| ComparisonResult { deployment: row.deployment, - block_number: row.block_number as u64, + block_number: row + .block_number + .parse::() + .expect("Failed to parse as u64"), result_type, local_attestation, attestations, @@ -692,7 +722,10 @@ pub async fn get_comparison_results_by_deployment( Ok(ComparisonResult { deployment: row.deployment, - block_number: row.block_number as u64, + block_number: row + .block_number + .parse::() + .expect("Failed to parse as u64"), result_type: ComparisonResultType::from_str(&row.result_type)?, local_attestation, attestations, diff --git a/subgraph-radio/src/database/test.rs b/subgraph-radio/src/database/test.rs index b40cf6a..bcad122 100644 --- a/subgraph-radio/src/database/test.rs +++ b/subgraph-radio/src/database/test.rs @@ -1,5 +1,7 @@ #[cfg(test)] mod tests { + use std::collections::HashSet; + use graphcast_sdk::{graphcast_agent::message_typing::GraphcastMessage, networks::NetworkName}; use sqlx::SqlitePool; @@ -189,7 +191,7 @@ mod tests { attestations: Vec::new(), }; - let _ = handle_comparison_result(&pool, &new_result).await; + let _ = handle_comparison_result(&pool, &new_result, HashSet::new()).await; let comparison_results = get_comparison_results(&pool).await.unwrap(); let result = comparison_results.first().unwrap(); @@ -223,7 +225,7 @@ mod tests { }; let _ = save_comparison_result(&pool, &old_result).await; - let _ = handle_comparison_result(&pool, &new_result).await; + let _ = handle_comparison_result(&pool, &new_result, HashSet::new()).await; let comparison_results = get_comparison_results(&pool).await.unwrap(); assert_eq!( diff --git a/subgraph-radio/src/messages/poi.rs b/subgraph-radio/src/messages/poi.rs index 1faae02..0fef657 100644 --- a/subgraph-radio/src/messages/poi.rs +++ b/subgraph-radio/src/messages/poi.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use async_graphql::SimpleObject; use autometrics::autometrics; use chrono::Utc; @@ -27,6 +29,7 @@ use crate::database::{ insert_local_attestation, insert_remote_ppoi_message, }; use crate::operator::attestation::process_ppoi_message; + use crate::{ metrics::CACHED_PPOI_MESSAGES, operator::{ @@ -295,7 +298,11 @@ pub async fn send_poi_message( /// we should update PersistedState::remote_ppoi_message standalone /// from GraphcastMessage field such as nonce #[autometrics(track_concurrency)] -pub async fn process_valid_message(msg: GraphcastMessage, pool: &SqlitePool) { +pub async fn process_valid_message( + msg: GraphcastMessage, + allocated: bool, + pool: &SqlitePool, +) { let identifier = msg.identifier.clone(); if let Err(e) = insert_remote_ppoi_message(pool, &msg).await { @@ -307,7 +314,7 @@ pub async fn process_valid_message(msg: GraphcastMessage, pool if let Ok(message_count) = count_remote_ppoi_messages(pool, &identifier).await { // Update the metrics CACHED_PPOI_MESSAGES - .with_label_values(&[&identifier]) + .with_label_values(&[&identifier, &allocated.to_string()]) .set(message_count.into()); } else { error!("Error counting remote ppoi messages."); @@ -321,6 +328,7 @@ pub async fn poi_message_comparison( collect_window_duration: u64, callbook: CallBook, db: SqlitePool, + allocated_subgraphs: HashSet, ) -> Result { let time = Utc::now().timestamp() as u64; @@ -355,9 +363,10 @@ pub async fn poi_message_comparison( .collect::>(); // Process the filtered POI messages to get remote attestations - let remote_attestations = process_ppoi_message(filtered_messages, &callbook) - .await - .map_err(OperationError::Attestation)?; + let remote_attestations = + process_ppoi_message(filtered_messages, &callbook, allocated_subgraphs.clone()) + .await + .map_err(OperationError::Attestation)?; let local_attestation = get_local_attestation(&db, &id, compare_block) .await @@ -372,6 +381,7 @@ pub async fn poi_message_comparison( compare_block, &remote_attestations, &id, + allocated_subgraphs, ); Ok(comparison_result) diff --git a/subgraph-radio/src/metrics/mod.rs b/subgraph-radio/src/metrics/mod.rs index 52b2dd3..dc7d64a 100644 --- a/subgraph-radio/src/metrics/mod.rs +++ b/subgraph-radio/src/metrics/mod.rs @@ -17,7 +17,7 @@ pub static VALIDATED_MESSAGES: Lazy = Lazy::new(|| { Opts::new("validated_messages", "Number of validated messages") .namespace("graphcast") .subsystem("subgraph_radio"), - &["deployment", "message_type"], + &["deployment", "message_type", "allocated"], ) .expect("Failed to create validated_messages counters"); prometheus::register(Box::new(m.clone())) @@ -32,11 +32,11 @@ pub static CACHED_PPOI_MESSAGES: Lazy = Lazy::new(|| { Opts::new("cached_ppoi_messages", "Number of messages in cache") .namespace("graphcast") .subsystem("subgraph_radio"), - &["deployment"], + &["deployment", "allocated"], ) .expect("Failed to create cached_ppoi_messages gauges"); prometheus::register(Box::new(m.clone())) - .expect("Failed to register cached_ppoi_messages guage"); + .expect("Failed to register cached_ppoi_messages gauge"); m }); @@ -50,7 +50,7 @@ pub static ACTIVE_INDEXERS: Lazy = Lazy::new(|| { ) .namespace("graphcast") .subsystem("subgraph_radio"), - &["deployment"], + &["deployment", "allocated"], ) .expect("Failed to create active_indexers gauge"); prometheus::register(Box::new(m.clone())).expect("Failed to register ACTIVE_INDEXERS counter"); @@ -154,7 +154,7 @@ pub static LATEST_MESSAGE_TIMESTAMP: Lazy = Lazy::new(|| { ) .namespace("graphcast") .subsystem("subgraph_radio"), - &["deployment_hash"], + &["deployment_hash", "allocated"], ) .expect("Failed to create latest_message_timestamp gauge"); prometheus::register(Box::new(m.clone())) @@ -168,7 +168,7 @@ pub static ATTESTED_MAX_STAKE_WEIGHT: Lazy = Lazy::new(|| { Opts::new("attested_max_stake_weight", "Highest stake-backed POI") .namespace("graphcast") .subsystem("subgraph_radio"), - &["deployment_hash"], + &["deployment_hash", "allocated"], ) .expect("Failed to create attested_max_stake_weight gauge"); prometheus::register(Box::new(m.clone())).expect("Failed to register max_poi_stake gauge"); @@ -199,7 +199,7 @@ pub static COMPARISON_RESULTS: Lazy = Lazy::new(|| { ) .namespace("graphcast") .subsystem("subgraph_radio"), - &["deployment"], + &["deployment", "allocation"], ) .expect("Failed to create comparison_results gauge"); prometheus::register(Box::new(m.clone())).expect("Failed to register COMPARISON_RESULTS gauge"); diff --git a/subgraph-radio/src/operator/attestation.rs b/subgraph-radio/src/operator/attestation.rs index 3175b78..d311b9d 100644 --- a/subgraph-radio/src/operator/attestation.rs +++ b/subgraph-radio/src/operator/attestation.rs @@ -113,6 +113,7 @@ pub type LocalAttestationsMap = HashMap>; pub async fn process_ppoi_message( messages: Vec>, callbook: &CallBook, + allocated_subgraphs: HashSet, ) -> Result { let start_time = Instant::now(); @@ -135,12 +136,14 @@ pub async fn process_ppoi_message( .map_err(|e| AttestationError::BuildError(MessageError::FieldDerivations(e)))? as u64; + let allocated = allocated_subgraphs.contains(&msg.identifier); + FREQUENT_SENDERS_COUNTER .with_label_values(&[&radio_msg.graph_account]) .inc(); LATEST_MESSAGE_TIMESTAMP - .with_label_values(&[&msg.identifier]) + .with_label_values(&[&msg.identifier, &allocated.to_string()]) .set(radio_msg.nonce as f64); let blocks = remote_attestations @@ -185,7 +188,10 @@ pub async fn process_ppoi_message( .entry(first_msg.identifier.to_string()) .or_default(); - let active_indexers = ACTIVE_INDEXERS.with_label_values(&[&first_msg.identifier.to_string()]); + let allocated = allocated_subgraphs.contains(&first_msg.identifier.to_string()); + + let active_indexers = ACTIVE_INDEXERS + .with_label_values(&[&first_msg.identifier.to_string(), &allocated.to_string()]); let senders = combine_senders(blocks.entry(first_msg.payload.block_number).or_default()); active_indexers.set(senders.len().try_into().unwrap()); @@ -319,6 +325,7 @@ impl ComparisonResult { pub async fn handle_comparison_result( pool: &SqlitePool, new_comparison_result: &ComparisonResult, + allocated_subgraphs: HashSet, ) -> Result { let deployment_hash = new_comparison_result.deployment_hash(); let existing_result = get_comparison_results_by_deployment(pool, &deployment_hash) @@ -326,8 +333,10 @@ pub async fn handle_comparison_result( .into_iter() .next(); + let allocated = allocated_subgraphs.contains(&deployment_hash.to_string()); + COMPARISON_RESULTS - .with_label_values(&[&deployment_hash]) + .with_label_values(&[&deployment_hash, &allocated.to_string()]) .inc(); // Determine if the state has changed and if the database operation is successful @@ -475,7 +484,10 @@ pub fn compare_attestations( attestation_block: u64, remote: &RemoteAttestationsMap, ipfs_hash: &str, + allocated_subgraphs: HashSet, ) -> ComparisonResult { + let allocated = allocated_subgraphs.contains(&ipfs_hash.to_string()); + // Attempt to retrieve remote attestations for the given IPFS hash and block number let remote_attestations = remote .get(ipfs_hash) @@ -496,19 +508,19 @@ pub fn compare_attestations( let result_type = match (&most_attested_poi, &local_attestation) { (Some(most_attested), Some(local_att)) if most_attested.ppoi == local_att.ppoi => { ATTESTED_MAX_STAKE_WEIGHT - .with_label_values(&[ipfs_hash]) + .with_label_values(&[ipfs_hash, &allocated.to_string()]) .set(most_attested.stake_weight as f64); ComparisonResultType::Match } (Some(most_attested), Some(_)) => { ATTESTED_MAX_STAKE_WEIGHT - .with_label_values(&[ipfs_hash]) + .with_label_values(&[ipfs_hash, &allocated.to_string()]) .set(most_attested.stake_weight as f64); ComparisonResultType::Divergent } (Some(most_attested), None) => { ATTESTED_MAX_STAKE_WEIGHT - .with_label_values(&[ipfs_hash]) + .with_label_values(&[ipfs_hash, &allocated.to_string()]) .set(most_attested.stake_weight as f64); ComparisonResultType::NotFound } @@ -635,6 +647,7 @@ pub async fn process_comparison_results( result_strings: Vec>, notifier: Notifier, db: SqlitePool, + allocated_subgraphs: HashSet, ) { // Generate attestation summary let mut match_strings = vec![]; @@ -647,7 +660,13 @@ pub async fn process_comparison_results( for result in result_strings { match result { Ok(comparison_result) => { - match handle_comparison_result(&db, &comparison_result.clone()).await { + match handle_comparison_result( + &db, + &comparison_result.clone(), + allocated_subgraphs.clone(), + ) + .await + { Ok(result_type) => match result_type { ComparisonResultType::Match => { match_strings.push(comparison_result.to_string()); diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index 4ceb2bf..dd23e40 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::env; use std::path::Path; use std::sync::{atomic::Ordering, mpsc::Receiver, Arc}; @@ -28,7 +29,6 @@ use crate::operator::attestation::log_gossip_summary; use crate::operator::attestation::process_comparison_results; use crate::operator::notifier::NotificationMode; use crate::server::run_server; -use crate::GRAPHCAST_AGENT; use crate::{ chainhead_block_str, messages::poi::{process_valid_message, PublicPoiMessage}, @@ -38,6 +38,7 @@ use crate::{ operator::{attestation::ComparisonResultType, indexer_management::health_query}, }; use crate::{config::Config, shutdown, ControlFlow}; +use crate::{syncing_deployment_hashes, GRAPHCAST_AGENT}; use self::notifier::Notifier; @@ -97,7 +98,7 @@ impl RadioOperator { debug!("Set global static instance of graphcast_agent"); _ = GRAPHCAST_AGENT.set(graphcast_agent.clone()); - config.validate_indexer_address().await; + //config.validate_indexer_address().await; //TODO: Refactor indexer management server validation to SDK, similar to graph node status endpoint if let Some(url) = &config.graph_stack.indexer_management_server_endpoint { @@ -323,12 +324,19 @@ impl RadioOperator { ) .await; + let allocated_subgraphs: HashSet = + syncing_deployment_hashes(self.config.graph_stack().graph_node_status_endpoint()) + .await + .into_iter() + .collect(); + process_comparison_results( blocks_str, identifiers.len(), comparison_res, self.notifier.clone(), - self.db.clone() + self.db.clone(), + allocated_subgraphs ) }).await; @@ -466,9 +474,17 @@ pub async fn process_message( let nonces = agent.nonces.clone(); let local_sender = agent.graphcast_identity.graphcast_id.clone(); + let allocated_subgraphs: HashSet = + syncing_deployment_hashes(config.graph_stack().graph_node_status_endpoint()) + .await + .into_iter() + .collect(); + // handle each message based on their type match determine_message_type(msg) { TypedMessage::PublicPoi(msg) => { + let allocated = allocated_subgraphs.contains(&msg.identifier); + trace!( message = tracing::field::debug(&msg), "Handling a Public PoI message", @@ -491,9 +507,13 @@ pub async fn process_message( .is_ok() { VALIDATED_MESSAGES - .with_label_values(&[&msg.identifier, "public_poi_message"]) + .with_label_values(&[ + &msg.identifier, + &allocated.to_string(), + "public_poi_message", + ]) .inc(); - process_valid_message(msg.clone(), db).await; + process_valid_message(msg.clone(), allocated, db).await; } } Err(e) => { @@ -535,8 +555,14 @@ pub async fn process_message( .validity_check(msg, &config.graph_stack.network_subgraph.clone()) .await { + let allocated = allocated_subgraphs.contains(&msg.identifier); + VALIDATED_MESSAGES - .with_label_values(&[&msg.identifier, "upgrade_intent_message"]) + .with_label_values(&[ + &msg.identifier, + &allocated.to_string(), + "upgrade_intent_message", + ]) .inc(); if radio_msg .process_valid_message(&config, ¬ifier, db) diff --git a/subgraph-radio/src/operator/operation.rs b/subgraph-radio/src/operator/operation.rs index ee862a2..cd5e401 100644 --- a/subgraph-radio/src/operator/operation.rs +++ b/subgraph-radio/src/operator/operation.rs @@ -1,7 +1,7 @@ use autometrics::autometrics; use sqlx::SqlitePool; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tracing::{debug, error, warn}; use graphcast_sdk::{ @@ -10,7 +10,7 @@ use graphcast_sdk::{ }; use crate::database::{clean_remote_ppoi_messages, delete_outdated_local_attestations}; -use crate::DatabaseError; +use crate::{syncing_deployment_hashes, DatabaseError}; use crate::messages::poi::{poi_message_comparison, send_poi_message}; @@ -132,10 +132,22 @@ impl RadioOperator { let callbook = self.config.callbook().clone(); let db = self.db.clone(); - let compare_handle = tokio::spawn(async move { - poi_message_comparison(id.clone(), collect_duration, callbook, db) + let allocated_subgraphs: HashSet = + syncing_deployment_hashes(self.config.graph_stack().graph_node_status_endpoint()) .await - .map_err(OperationError::from) + .into_iter() + .collect(); + + let compare_handle = tokio::spawn(async move { + poi_message_comparison( + id.clone(), + collect_duration, + callbook, + db, + allocated_subgraphs, + ) + .await + .map_err(OperationError::from) }); compare_handles.push(compare_handle); } diff --git a/subgraph-radio/src/server/model/mod.rs b/subgraph-radio/src/server/model/mod.rs index 7f0cd35..f961a0b 100644 --- a/subgraph-radio/src/server/model/mod.rs +++ b/subgraph-radio/src/server/model/mod.rs @@ -7,9 +7,12 @@ use crate::{ get_comparison_results, get_comparison_results_by_deployment, get_upgrade_intent_message_by_id, get_upgrade_intent_messages, }, - DatabaseError, OperationError, + syncing_deployment_hashes, DatabaseError, OperationError, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, }; -use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use crate::{ @@ -115,11 +118,22 @@ impl QueryRoot { result_type: Option, ) -> Result, HttpServiceError> { let res = self - .comparison_results(ctx, deployment, block, result_type) + .comparison_results(ctx, deployment.clone(), block, result_type) .await?; let local_info = self.indexer_info(ctx).await?; let mut ratios = vec![]; + + let config = ctx + .data_unchecked::>() + .radio_config(); + + let allocated_subgraphs: HashSet = + syncing_deployment_hashes(config.graph_stack().graph_node_status_endpoint()) + .await + .into_iter() + .collect(); + for r in res { match aggregate_attestation(r.clone(), &local_info) { Ok((aggregated_attestations, local_ppoi)) => { @@ -128,11 +142,14 @@ impl QueryRoot { local_ppoi.as_deref(), local_info.stake, ); + let allocated = allocated_subgraphs.contains(&r.deployment); + ratios.push(CompareRatio::new( r.deployment, r.block_number, sender_ratio, stake_ratio, + allocated, )); } Err(e) => { @@ -349,16 +366,23 @@ impl SubgraphRadioContext { .remote_ppoi_messages_filtered(&identifier, &Some(block)) .await; - let remote_attestations = process_ppoi_message(msgs, &config.callbook()) - .await - .ok() - .and_then(|r| { - r.get(&entry.identifier) - .and_then(|deployment_attestations| { - deployment_attestations.get(&entry.block_number).cloned() - }) - }) - .unwrap_or_default(); + let allocated_subgraphs: HashSet = + syncing_deployment_hashes(config.graph_stack().graph_node_status_endpoint()) + .await + .into_iter() + .collect(); + + let remote_attestations = + process_ppoi_message(msgs, &config.callbook(), allocated_subgraphs) + .await + .ok() + .and_then(|r| { + r.get(&entry.identifier) + .and_then(|deployment_attestations| { + deployment_attestations.get(&entry.block_number).cloned() + }) + }) + .unwrap_or_default(); let r = compare_attestation(entry, remote_attestations); if result_type.is_none() || (Some(r.result_type) == result_type) { @@ -488,6 +512,7 @@ struct CompareRatio { block_number: u64, sender_ratio: String, stake_ratio: String, + allocated: bool, } impl CompareRatio { @@ -496,12 +521,14 @@ impl CompareRatio { block_number: u64, sender_ratio: String, stake_ratio: String, + allocated: bool, ) -> Self { CompareRatio { deployment, block_number, sender_ratio, stake_ratio, + allocated, } } }