Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
fix: add test retries and gc msg valid check
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Jul 12, 2023
1 parent 5e3f0c1 commit 444fae1
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 105 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions one-shot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"]
categories = ["network-programming", "web-programming::http-client"]

[dependencies]
# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" }
graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" }
graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" }
poi-radio = { path = "../poi-radio" }
prost = "0.11"
once_cell = "1.17"
Expand Down
3 changes: 1 addition & 2 deletions poi-radio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"]
categories = ["network-programming", "web-programming::http-client"]

[dependencies]
# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" }
graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" }
graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" }
prost = "0.11"
once_cell = "1.17"
chrono = "0.4"
Expand Down
3 changes: 2 additions & 1 deletion poi-radio/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tracing::{debug, info, trace};

use crate::state::PersistedState;
use crate::state::{panic_hook, PersistedState};
use crate::{active_allocation_hashes, syncing_deployment_hashes};

#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -400,6 +400,7 @@ impl Config {
"Loaded Persisted state cache"
);

panic_hook(path);
state
} else {
debug!("Created new state");
Expand Down
59 changes: 0 additions & 59 deletions poi-radio/src/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,2 @@
use graphcast_sdk::graphcast_agent::{
message_typing::GraphcastMessage, waku_handling::WakuHandlingError,
};
use poi::PublicPoiMessage;
use std::{
any::Any,
sync::{mpsc, Mutex as SyncMutex},
};
use tracing::{error, trace};
use upgrade::VersionUpgradeMessage;

pub mod poi;
pub mod upgrade;

#[derive(Debug, Clone, serde_derive::Deserialize, serde_derive::Serialize)]
pub enum MessageType {
PublicPoi(GraphcastMessage<poi::PublicPoiMessage>),
VersionUpgrade(GraphcastMessage<upgrade::VersionUpgradeMessage>),
}

pub fn typed_handler(sender: SyncMutex<mpsc::Sender<MessageType>>, msg: &dyn Any) {
if let Some(Ok(ppoi_message)) =
msg.downcast_ref::<Result<GraphcastMessage<PublicPoiMessage>, WakuHandlingError>>()
{
trace!(
ppoi_message = tracing::field::debug(&ppoi_message),
"Received Graphcast validated message"
);

// let id = ppoi_message.identifier.clone();
// VALIDATED_MESSAGES.with_label_values(&[&id]).inc();

match sender
.lock()
.unwrap()
.send(MessageType::PublicPoi(ppoi_message.clone()))
{
Ok(_) => trace!("Sent received message to radio operator"),
Err(e) => error!("Could not send message to channel: {:#?}", e),
}
} else if let Some(Ok(upgrade_message)) =
msg.downcast_ref::<Result<GraphcastMessage<VersionUpgradeMessage>, WakuHandlingError>>()
{
trace!(
upgrade_message = tracing::field::debug(&upgrade_message),
"Received Graphcast validated message"
);

// let id = upgrade_message.identifier.clone();
// VALIDATED_MESSAGES.with_label_values(&[&id]).inc();

match sender
.lock()
.unwrap()
.send(MessageType::VersionUpgrade(upgrade_message.clone()))
{
Ok(_) => trace!("Sent upgrade message to radio operator"),
Err(e) => error!("Could not send message to channel: {:#?}", e),
}
}
}
54 changes: 51 additions & 3 deletions poi-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use tracing::{debug, error, info, trace, warn};

use graphcast_sdk::{
build_wallet,
graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent},
graphcast_agent::{
message_typing::{check_message_validity, GraphcastMessage},
waku_handling::WakuHandlingError,
GraphcastAgent,
},
graphql::client_graph_node::{subgraph_network_blocks, update_network_chainheads},
};

Expand Down Expand Up @@ -117,18 +121,43 @@ impl RadioOperator {
let state_ref = persisted_state.clone();
let upgrade_notifier = notifier.clone();
let graph_node = config.graph_node_endpoint().clone();

// try message format in order of PublicPOIMessage, VersionUpgradeMessage
tokio::spawn(async move {
for msg in receiver {
trace!("Decoding waku message into Graphcast Message with Radio specified payload");
let agent = GRAPHCAST_AGENT
.get()
.expect("Could not retrieve Graphcast agent");
let id_validation = agent.id_validation.clone();
let callbook = agent.callbook.clone();
let nonces = agent.nonces.clone();
let local_sender = agent.graphcast_identity.graphcast_id.clone();
if let Ok(msg) = agent.decoder::<PublicPoiMessage>(msg.payload()).await {
trace!(
message = tracing::field::debug(&msg),
"Parsed and validated as Public PoI message",
"Parseable as Public PoI message, now validate",
);
let msg = match check_message_validity(
msg,
&nonces,
callbook.clone(),
local_sender.clone(),
&id_validation,
)
.await
.map_err(|e| WakuHandlingError::InvalidMessage(e.to_string()))
{
Ok(msg) => msg,
Err(e) => {
debug!(
err = tracing::field::debug(e),
"Failed to validate by Graphcast"
);
continue;
}
};

let identifier = msg.identifier.clone();

let is_valid = msg.payload.validity_check(&msg, &graph_node).await;
Expand All @@ -150,8 +179,27 @@ impl RadioOperator {
{
trace!(
message = tracing::field::debug(&msg),
"Parsed and validated as Version Upgrade message",
"Parseable as Version Upgrade message, now validate",
);
let msg = match check_message_validity(
msg,
&nonces,
callbook.clone(),
local_sender.clone(),
&id_validation,
)
.await
.map_err(|e| WakuHandlingError::InvalidMessage(e.to_string()))
{
Ok(msg) => msg,
Err(e) => {
debug!(
err = tracing::field::debug(e),
"Failed to validate by Graphcast"
);
continue;
}
};
let is_valid = msg.payload.validity_check(&msg, &graph_node).await;

if let Ok(payload) = is_valid {
Expand Down
35 changes: 19 additions & 16 deletions poi-radio/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use serde::{Deserialize, Serialize};

use std::panic::PanicInfo;
use std::path::Path;

use std::fs;
use std::str::FromStr;
use std::sync::{Arc, Mutex as SyncMutex};
use std::{
collections::HashMap,
fs::{remove_file, File},
io::{BufReader, Write},
};
use std::{fs, panic};
use tracing::{info, trace, warn};

use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage;
Expand All @@ -17,6 +19,7 @@ use crate::operator::attestation::{
clear_local_attestation, ComparisonResult, ComparisonResultType,
};
use crate::operator::notifier::Notifier;
use crate::RADIO_OPERATOR;

use crate::{messages::poi::PublicPoiMessage, operator::attestation::Attestation};

Expand Down Expand Up @@ -283,21 +286,21 @@ impl PersistedState {
}

// TODO: panic hook for updating the cache file before exiting the program
// /// Set up panic hook to store persisted state
// pub fn panic_hook<'a>(file_path: &str) {
// let path = String::from_str(file_path).expect("Invalid file path provided");
// panic::set_hook(Box::new(move |panic_info| panic_cache(panic_info, &path)));
// }

// pub fn panic_cache(panic_info: &PanicInfo<'_>, file_path: &str) {
// RADIO_OPERATOR
// .get()
// .unwrap()
// .state()
// .update_cache(file_path);
// // Log panic information and program state
// eprintln!("Panic occurred! Panic info: {:?}", panic_info);
// }
/// Set up panic hook to store persisted state
pub fn panic_hook(file_path: &str) {
let path = String::from_str(file_path).expect("Invalid file path provided");
panic::set_hook(Box::new(move |panic_info| panic_cache(panic_info, &path)));
}

pub fn panic_cache(panic_info: &PanicInfo<'_>, file_path: &str) {
RADIO_OPERATOR
.get()
.unwrap()
.state()
.update_cache(file_path);
// Log panic information and program state
eprintln!("Panic occurred! Panic info: {:?}", panic_info);
}

#[cfg(test)]
mod tests {
Expand Down
3 changes: 1 addition & 2 deletions test-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ categories = [
[dependencies]
waku = { version = "0.1.1", package = "waku-bindings" }
test-utils = { path = "../test-utils" }
# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" }
graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" }
graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" }
poi-radio = { path = "../poi-radio" }
tokio = { version = "1.1.1", features = ["full", "rt"] }
tracing = "0.1"
Expand Down
28 changes: 19 additions & 9 deletions test-runner/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,16 @@ pub async fn topics_test() {

tokio::time::sleep(Duration::from_secs(50)).await;

let persisted_state = PersistedState::load_cache(&store_path);
debug!("persisted state {:?}", persisted_state);

let remote_messages = persisted_state.remote_messages();

let test_hash = "QmonlyintestsenderXyZABCdeFgHIjklMNOpqrstuvWXYZabcdEFG";
let has_test_hash = remote_messages
.iter()
.any(|msg| msg.identifier == test_hash);

let mut has_test_hash = test_result(&store_path, test_hash);

let max_test_attempts = 3;
let mut num_test_attempts = 0;
while num_test_attempts < max_test_attempts && !has_test_hash {
tokio::time::sleep(Duration::from_secs(config.topic_update_interval + 1)).await;
has_test_hash = test_result(&store_path, test_hash);
num_test_attempts += 1;
}
assert!(
has_test_hash,
"Expected remote message not found with identifier {}",
Expand All @@ -126,3 +126,13 @@ pub async fn topics_test() {

teardown(process_manager, &store_path);
}

fn test_result(store_path: &str, test_hash: &str) -> bool {
let persisted_state = PersistedState::load_cache(store_path);
debug!("persisted state {:?}", persisted_state);

let remote_messages = persisted_state.remote_messages();
remote_messages
.iter()
.any(|msg| msg.identifier == test_hash)
}
3 changes: 1 addition & 2 deletions test-sender/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ categories = [

[dependencies]
waku = { version = "0.1.1", package = "waku-bindings" }
# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" }
graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" }
graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" }
test-utils = { path = "../test-utils" }
poi-radio = { path = "../poi-radio" }
tokio = { version = "1.1.1", features = ["full", "rt"] }
Expand Down
13 changes: 8 additions & 5 deletions test-sender/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,20 @@ async fn start_sender(config: TestSenderConfig) {
let pubsub_topic = WakuPubSubTopic::from_str(pubsub_topic_str).unwrap();
loop {
for topic in config.topics.clone() {
let timestamp = Utc::now().timestamp();
let timestamp = (timestamp + 9) / 10 * 10;

let nodes = gather_nodes(vec![], &pubsub_topic);
// Connect to peers on the filter protocol
connect_multiaddresses(nodes, &node_handle, ProtocolId::Filter);

let content_topic = format!("/{}/0/{}/proto", config.radio_name, topic);
let content_topic = WakuContentTopic::from_str(&content_topic).unwrap();

// let nonce = config.nonce.clone().unwrap().parse::<i64>().unwrap();
let timestamp =
if let Some(n) = config.nonce.clone().and_then(|x| x.parse::<i64>().ok()) {
n
} else {
Utc::now().timestamp()
};
let block_number = (timestamp + 9) / 10 * 10;

let radio_payload_clone = config.radio_payload.clone();
match radio_payload_clone.as_deref() {
Expand All @@ -91,7 +94,7 @@ async fn start_sender(config: TestSenderConfig) {
config.poi.clone().unwrap(),
timestamp,
NetworkName::Goerli,
timestamp.try_into().unwrap(),
block_number.try_into().unwrap(),
config.block_hash.clone().unwrap(),
"0x7e6528e4ce3055e829a32b5dc4450072bac28bc6".to_string(),
);
Expand Down
3 changes: 1 addition & 2 deletions test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ categories = [

[dependencies]
waku = { version = "0.1.1", package = "waku-bindings" }
# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" }
graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" }
graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" }
poi-radio = { path = "../poi-radio" }
tokio = { version = "1.1.1", features = ["full", "rt"] }
tracing = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions test-utils/src/mock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ pub async fn start_mock_server(

async fn handler_graphql(subgraphs: Arc<Mutex<Vec<String>>>) -> Result<String, Infallible> {
let timestamp = Utc::now().timestamp();
let timestamp = (timestamp + 9) / 10 * 10;
let block_number = (timestamp + 9) / 10 * 10;
let subgraphs = subgraphs.lock().await;

// Prepare indexingStatuses part of the response dynamically from the subgraphs vector
let indexing_statuses: Vec<String> = subgraphs
.iter()
.map(|hash| format!(
r#"{{"subgraph": "{}", "synced": true, "health": "healthy", "node": "default", "fatalError": null, "chains": [{{"network": "mainnet", "latestBlock": {{"number": "{}", "hash": "b30395958a317ccc06da46782f660ce674cbe6792e5573dc630978c506114a0a"}}, "chainHeadBlock": {{"number": "{}", "hash": "b30395958a317ccc06da46782f660ce674cbe6792e5573dc630978c506114a0a"}}}}]}}"#,
hash, timestamp, timestamp
hash, block_number, block_number
))
.collect();

Expand Down

0 comments on commit 444fae1

Please sign in to comment.