Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

presync feedback loop - subcommand indexing status #11

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ethers-contract = "2.0.4"
ethers-core = "2.0.4"
ethers-derive-eip712 = "1.0.2"
async-graphql = "4.0.16"
reqwest = "0.11.20"

[dev-dependencies.cargo-husky]
version = "1"
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ When developers publish a new version (subgraph deployment) to their subgraph, d

Indexers running the subgraph radio and listening to that channel will in turn receive the message and potentially start to offchain new deployment.

It is still at the subgraph developers' discretion to await for the indexers to sync upto chainhead, in which point they can publish the staged version without disrupting API usage.
It is still at the subgraph developers' discretion to await for the indexers to sync upto chainhead, in which point they can publish the staged version without disrupting API usage. This tool provides a convinence function that allows subgraph developer to ....

## 🆙 Example usage

### UpgradePresync

To send a message on Graphcast mainnet for the subgraph deployment "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB", we would need its subgraph id "CnJMdCkW3pr619gsJVtUPAWxspALPdCMw6o7obzYBNp3", private key to the graph account or an operator of the graph account, the subgraph owner's graph account, and the new deployment hash. You can supply them as an CLI argument

```
Expand All @@ -40,6 +42,15 @@ The entire process from running the binary to sending the message should take ~4
2023-07-31T17:56:59.328490Z INFO Sent message, msg_id: "0xc6b1131e0f8302abe48057f6fc69722ab46bd4285c2c4a8a8bdca6b221dcda96"
```

### IndexingStatus

After sending `UpgradeIntentMessage`, a developer can periodically check the indexing status of the new subgraph deployment at the public APIs of the indexers who actively allocates on the current version of the subgraph.

Same arguments here can be used as the argument for `UpgradeIntentMessage`. However, gossips are not involved in this operation and the queries are made through deterministic queries.

```
cargo run indexing-status --subgraph-id "0x00000444e5a1a667663b0adfd853e8efa0470698-0" --new-hash QmfDJFYaDX7BdwT6rYa8Bx71vPjTueUVDN99pdwFgysDiZ
```

## 🧪 Testing

Expand Down
26 changes: 25 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub struct Config {
impl Config {
/// Parse config arguments
pub fn args() -> Self {
// TODO: load config file before parse (maybe add new level of subcommands)
let config = Config::parse();
std::env::set_var("RUST_LOG", config.radio_infrastructure().log_level.clone());
// Enables tracing under RUST_LOG variable
Expand Down Expand Up @@ -341,6 +340,10 @@ pub enum Commands {
long_about = "A subgraph developer can send a gossip to inform indexers the new version of a subgraph before publishing
")]
UpgradePresync(UpgradePresyncArg),
#[clap(aliases = ["status"], about = "Query indexing status of a subgraph deployment at allocated indexers at the current deployment of subgraph id",
long_about = "A subgraph developer quickly query the indexing status of a deployment (new_hash) at public status APIs of indexers actively allocated to the subgraph_id
")]
IndexingStatus(IndexingStatusArg),
}

#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
Expand All @@ -362,6 +365,27 @@ pub struct UpgradePresyncArg {
pub new_hash: String,
}

//TODO: Add network to support multi-network queries, currently just take the
//the first chain at indexing status returned by allocated indexers
#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
#[group(required = true, multiple = true)]
pub struct IndexingStatusArg {
#[clap(
long,
value_name = "SUBGRAPH_ID",
env = "SUBGRAPH_ID",
help = "Subgraph id shared by the old and new deployment"
)]
pub subgraph_id: String,
#[clap(
long,
value_name = "NEW_HASH",
env = "NEW_HASH",
help = "Subgraph deployment hash for the upgrade version of the subgraph"
)]
pub new_hash: String,
}

#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("Validate the input: {0}")]
Expand Down
10 changes: 1 addition & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
use once_cell::sync::OnceCell;
use std::sync::Arc;

use graphcast_sdk::graphcast_agent::GraphcastAgent;

pub mod config;
pub mod messages;
pub mod operator;

/// A global static (singleton) instance of GraphcastAgent. It is useful to ensure that we have only one GraphcastAgent
/// per Radio instance, so that we can keep track of state and more easily test our Radio application.
pub static GRAPHCAST_AGENT: OnceCell<Arc<GraphcastAgent>> = OnceCell::new();
pub mod query;
29 changes: 16 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::mpsc;
use dotenv::dotenv;
use graphcast_cli::{
config::{Commands, Config},
operator::RadioOperator,
operator::{operation::indexing_status, RadioOperator},
};
use graphcast_sdk::{graphcast_agent::GraphcastAgent, WakuMessage};

Expand All @@ -13,20 +13,23 @@ async fn main() {
// Parse basic configurations
let radio_config = Config::args();

// The channel is not used in CLI
let (sender, _) = mpsc::channel::<WakuMessage>();
let agent = GraphcastAgent::new(
radio_config.to_graphcast_agent_config().await.unwrap(),
sender,
)
.await
.expect("Initialize Graphcast agent");

let radio_operator = RadioOperator::new(&radio_config, agent).await;

match radio_config.subcommand() {
match &radio_config.subcommand() {
Commands::UpgradePresync(args) => {
// The channel is not used in CLI
let (sender, _) = mpsc::channel::<WakuMessage>();
let agent = GraphcastAgent::new(
radio_config.to_graphcast_agent_config().await.unwrap(),
sender,
)
.await
.expect("Initialize Graphcast agent");

let radio_operator = RadioOperator::new(&radio_config, agent).await;
radio_operator.upgrade_presync(args).await;
}
Commands::IndexingStatus(args) => {
// No graphcast agent or radio operator needed
indexing_status(&radio_config, args).await;
}
};
}
36 changes: 2 additions & 34 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use graphcast_sdk::graphql::client_graph_account::subgraph_hash_by_id;
use std::sync::Arc;
use tracing::{debug, warn};
use tracing::debug;

use graphcast_sdk::graphcast_agent::GraphcastAgent;

use crate::config::{Config, UpgradePresyncArg};
use crate::GRAPHCAST_AGENT;
use crate::config::Config;
pub mod operation;

/// Radio operator contains all states needed for radio operations
Expand All @@ -22,8 +20,6 @@ impl RadioOperator {
debug!("Initializing Graphcast Agent");
let graphcast_agent = Arc::new(agent);

_ = GRAPHCAST_AGENT.set(graphcast_agent.clone());

RadioOperator {
config: config.clone(),
graphcast_agent,
Expand All @@ -33,32 +29,4 @@ impl RadioOperator {
pub fn graphcast_agent(&self) -> &GraphcastAgent {
&self.graphcast_agent
}

/// radio continuously attempt to send message until success
pub async fn upgrade_presync(&self, args: &UpgradePresyncArg) {
let mut current_attempt: u64 = 0;

// Set subscription topic
let identifier = subgraph_hash_by_id(
self.config.graph_stack().network_subgraph(),
&self.config.graph_stack().graph_account,
&args.subgraph_id,
)
.await
.expect("Failed to match the upgrade intent with an existing subgraph deployment");
self.graphcast_agent
.update_content_topics(vec![identifier])
.await;

let mut res = self.gossip_one_shot(args).await;
// Try again if the gossip failed to send while the attempt number is within max_retry
while res.is_err() && current_attempt < self.config.max_retry {
warn!(
err = tracing::field::debug(&res),
current_attempt, "Failed to gossip, retry"
);
current_attempt += 1;
res = self.gossip_one_shot(args).await;
}
}
}
98 changes: 96 additions & 2 deletions src/operator/operation.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use chrono::Utc;
use graphcast_sdk::graphql::client_graph_account::subgraph_hash_by_id;
use tracing::{error, info};
use graphcast_sdk::graphql::QueryError;
use tracing::{debug, error, info, warn};

use graphcast_sdk::graphcast_agent::GraphcastAgentError;

use crate::config::UpgradePresyncArg;
use crate::config::{Config, IndexingStatusArg, UpgradePresyncArg};
use crate::messages::upgrade::UpgradeIntentMessage;
use crate::operator::RadioOperator;
use crate::query::{query_indexer_public_api, query_indexing_statuses, IndexerInfo};

impl RadioOperator {
pub async fn gossip_one_shot(
Expand Down Expand Up @@ -41,4 +43,96 @@ impl RadioOperator {
}
}
}

/// radio attempt to send message until success or max at configured retry
pub async fn upgrade_presync(&self, args: &UpgradePresyncArg) {
let mut current_attempt: u64 = 0;

// Set subscription topic
let identifier = subgraph_hash_by_id(
self.config.graph_stack().network_subgraph(),
&self.config.graph_stack().graph_account,
&args.subgraph_id,
)
.await
.expect("Failed to match the upgrade intent with an existing subgraph deployment");
self.graphcast_agent
.update_content_topics(vec![identifier])
.await;

let mut res = self.gossip_one_shot(args).await;
// Try again if the gossip failed to send while the attempt number is within max_retry
while res.is_err() && current_attempt < self.config.max_retry {
warn!(
err = tracing::field::debug(&res),
current_attempt, "Failed to gossip, retry"
);
current_attempt += 1;
res = self.gossip_one_shot(args).await;
}
}
}

/// Query the new deployment indexing status at public status endpoints registered
/// by the indexers who are actively allocating the current deployment
pub async fn indexing_status(config: &Config, args: &IndexingStatusArg) {
// Get list of public status APIs
let public_status_apis = query_indexer_public_api(&config.graph_stack().network_subgraph, &args.subgraph_id).await
.expect("Could not query public status APIs from indexers actively allocated on the network subgraph");
info!(
num_apis = public_status_apis.len(),
"Number of APIs to query indexing status"
);

// Query all the public status APIs for new_hash indexing_status
let new_hash_statuses = query_indexing_statuses(public_status_apis, &args.new_hash).await;
debug!("new_hash_statuses {:#?}", new_hash_statuses);
// Summarize the results
summarize_indexing_statuses(new_hash_statuses);
}

/// Summarize indexing statuses: Number of indexers,
pub fn summarize_indexing_statuses(statuses: Vec<Result<IndexerInfo, QueryError>>) {
let okay_results: Vec<IndexerInfo> = statuses
.iter()
.filter_map(|r| r.as_ref().ok())
.cloned()
.collect();
let synced_indexers = okay_results
.iter()
.filter(|&indexer| indexer.status.synced)
.count();
//TODO: to support multiple chains, check chains id specifications, right now brutally taking the first chain
let block_progress: String = okay_results
.iter()
.max_by_key(|indexer| indexer.latest_block_number())
.map(|indexer| {
let chain_head = indexer.chain_head_block_number();
let latest = indexer.latest_block_number();
format!("{} / {}", chain_head, latest)
})
.unwrap_or(String::from("N/A"));
let num_indexing_indexers = okay_results.len();
let avg_progress: f32 = okay_results
.iter()
.map(|indexer| {
let progress = indexer.latest_block_number() as f32
/ indexer.chain_head_block_number() as f32
* 100.0;
debug!(
indexer = tracing::field::debug(&indexer.info),
progress, "Indexer statuses"
);
progress
})
.sum::<f32>()
/ num_indexing_indexers as f32;
info!(
num_currently_allocated_indexer_apis = statuses.len(),
num_indexing_indexers = num_indexing_indexers,
num_synced_indexers = synced_indexers,
latest_synced_block = block_progress,
average_progress = format!("{}%", avg_progress),
"Indexing statuses summary"
);
}
Loading
Loading