Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
refactor: comparison results query resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Jul 3, 2023
1 parent 01f9b08 commit 2d57919
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 53 deletions.
2 changes: 2 additions & 0 deletions poi-radio/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct Config {
hide_env_values = true,
help = "Private key to the Graphcast ID wallet (Precendence over mnemonics)",
)]
// should keep this value private, this is current public due to the constructing a Config in test-utils
// We can get around this by making an explicit function to make config instead of direct build in {}
pub private_key: Option<String>,
#[clap(
long,
Expand Down
141 changes: 88 additions & 53 deletions poi-radio/src/server/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,8 @@ impl QueryRoot {
) -> Result<Vec<GraphcastMessage<RadioPayloadMessage>>, HttpServiceError> {
let msgs = ctx
.data_unchecked::<Arc<POIRadioContext>>()
.remote_messages();
let filtered = msgs
.iter()
.cloned()
.filter(|message| filter_remote_messages(message, &identifier, &block))
.collect::<Vec<_>>();
Ok(filtered)
.remote_messages_filtered(&identifier, &block);
Ok(msgs)
}

async fn local_attestations(
Expand All @@ -57,55 +52,31 @@ impl QueryRoot {
Ok(filtered)
}

// TODO: Reproduce tabular summary view. use process_message and compare_attestations
/// Function that optionally takes in identifier and block filters.
async fn comparison_results(
&self,
ctx: &Context<'_>,
deployment: Option<String>,
identifier: Option<String>,
block: Option<u64>,
_filter: Option<ResultFilter>,
) -> Result<Vec<ComparisonResult>, HttpServiceError> {
// Utilize the provided filters on local_attestations
let locals = attestations_to_vec(
&ctx.data_unchecked::<Arc<POIRadioContext>>()
.local_attestations(deployment.clone(), block),
);

let config = ctx.data_unchecked::<Arc<POIRadioContext>>().radio_config();
let network_subgraph = config.network_subgraph.clone();

let mut res = vec![];
for entry in locals {
let deployment_identifier = entry.deployment.clone();
let msgs = self
.radio_payload_messages(
ctx,
Some(deployment_identifier.clone()),
Some(entry.block_number),
)
.await?;
let remote_attestations = match process_messages(msgs, &network_subgraph).await {
Ok(r) => {
if let Some(deployment_attestations) = r.get(&deployment_identifier.clone()) {
if let Some(deployment_block_attestations) =
deployment_attestations.get(&entry.block_number)
{
deployment_block_attestations.clone()
} else {
continue;
}
} else {
continue;
}
}
Err(_e) => continue,
};
let res = &ctx
.data_unchecked::<Arc<POIRadioContext>>()
.comparison_results(identifier, block)
.await;

let r = compare_attestation(entry, remote_attestations);
res.push(r);
}
Ok(res.to_vec())
}

Ok(res)
/// Function to grab the latest relevant comparison result of a deployment
async fn comparison_result(
&self,
ctx: &Context<'_>,
identifier: String,
) -> Result<Option<ComparisonResult>, HttpServiceError> {
let res = &ctx
.data_unchecked::<Arc<POIRadioContext>>()
.comparison_result(identifier);
Ok(res.clone())
}

/// Return the sender ratio for remote attestations, with a "!" for the attestation matching local
Expand All @@ -114,22 +85,21 @@ impl QueryRoot {
ctx: &Context<'_>,
deployment: Option<String>,
block: Option<u64>,
filter: Option<ResultFilter>,
) -> Result<Vec<CompareRatio>, HttpServiceError> {
let res = self
.comparison_results(ctx, deployment, block, filter)
.await?;
let res = self.comparison_results(ctx, deployment, block).await?;
let local_info = self.indexer_info(ctx).await?;

let mut ratios = vec![];
for r in res {
// Double check for local attestations to ensure there will be no divide by 0 during the ratio
let local_attestation = if let Some(local_attestation) = r.local_attestation {
local_attestation
} else {
continue;
};
let local_npoi = local_attestation.npoi.clone();

// Aggregate remote attestations with the local attestations
let mut aggregated_attestations: Vec<Attestation> = vec![];
for a in r.attestations {
if a.npoi == local_attestation.npoi {
Expand Down Expand Up @@ -270,6 +240,71 @@ impl POIRadioContext {
self.persisted_state.remote_messages()
}

pub fn remote_messages_filtered(
&self,
identifier: &Option<String>,
block: &Option<u64>,
) -> Vec<GraphcastMessage<RadioPayloadMessage>> {
let msgs = self.remote_messages();
let filtered = msgs
.iter()
.cloned()
.filter(|message| filter_remote_messages(message, identifier, block))
.collect::<Vec<_>>();
filtered
}

pub fn comparison_result(&self, identifier: String) -> Option<ComparisonResult> {
let cmp_results = self.persisted_state.comparison_results();
cmp_results.get(&identifier).cloned()
}

pub async fn comparison_results(
&self,
identifier: Option<String>,
block: Option<u64>,
) -> Vec<ComparisonResult> {
// Simply take from persisted state if block is not specified
if block.is_none() {
let cmp_results = self.persisted_state.comparison_results();

cmp_results
.iter()
.filter(|&(deployment, _)| {
identifier.is_none() | (Some(deployment.clone()) == identifier)
})
.map(|(_, cmp_res)| cmp_res.clone())
.collect::<Vec<ComparisonResult>>()
} else {
// Calculate for the block if specified
let locals = attestations_to_vec(&self.local_attestations(identifier.clone(), block));

let config = self.radio_config();
let network_subgraph = config.network_subgraph.clone();

let mut res = vec![];
for entry in locals {
let deployment_identifier = entry.deployment.clone();
let msgs = self.remote_messages_filtered(&identifier, &block);
let remote_attestations = process_messages(msgs, &network_subgraph)
.await
.ok()
.and_then(|r| {
r.get(&deployment_identifier)
.and_then(|deployment_attestations| {
deployment_attestations.get(&entry.block_number).cloned()
})
})
.unwrap_or_default();

let r = compare_attestation(entry, remote_attestations);
res.push(r);
}

res
}
}

pub fn radio_config(&self) -> Config {
self.radio_config.clone()
}
Expand Down
9 changes: 9 additions & 0 deletions poi-radio/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ impl PersistedState {
self.comparison_results.lock().unwrap().clone()
}

/// Getter for comparison result
pub fn comparison_result(&self, deployment: String) -> Option<ComparisonResult> {
self.comparison_results
.lock()
.unwrap()
.get(&deployment)
.cloned()
}

/// Update local_attestations
pub async fn update_local(&mut self, local_attestations: Local) {
self.local_attestations = local_attestations;
Expand Down

0 comments on commit 2d57919

Please sign in to comment.