Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: validate ts client predicates before registering #639

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions components/client/typescript/src/schemas/predicate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,72 @@ export const PredicateSchema = Type.Composite([
}),
]);
export type Predicate = Static<typeof PredicateSchema>;

export const PredicateExpiredDataSchema = Type.Object({
expired_at_block_height: Type.Integer(),
last_evaluated_block_height: Type.Integer(),
last_occurrence: Type.Optional(Type.Integer()),
number_of_blocks_evaluated: Type.Integer(),
number_of_times_triggered: Type.Integer(),
});
export type PredicateExpiredData = Static<typeof PredicateExpiredDataSchema>;

export const PredicateStatusSchema = Type.Union([
Type.Object({
info: Type.Object({
number_of_blocks_to_scan: Type.Integer(),
number_of_blocks_evaluated: Type.Integer(),
number_of_times_triggered: Type.Integer(),
last_occurrence: Type.Optional(Type.Integer()),
last_evaluated_block_height: Type.Integer(),
}),
type: Type.Literal('scanning'),
}),
Type.Object({
info: Type.Object({
last_occurrence: Type.Optional(Type.Integer()),
last_evaluation: Type.Integer(),
number_of_times_triggered: Type.Integer(),
number_of_blocks_evaluated: Type.Integer(),
last_evaluated_block_height: Type.Integer(),
}),
type: Type.Literal('streaming'),
}),
Type.Object({
info: PredicateExpiredDataSchema,
type: Type.Literal('unconfirmed_expiration'),
}),
Type.Object({
info: PredicateExpiredDataSchema,
type: Type.Literal('confirmed_expiration'),
}),
Type.Object({
info: Type.String(),
type: Type.Literal('interrupted'),
}),
Type.Object({
type: Type.Literal('new'),
}),
]);
export type PredicateStatus = Static<typeof PredicateStatusSchema>;

export const SerializedPredicateSchema = Type.Object({
chain: Type.Union([Type.Literal('stacks'), Type.Literal('bitcoin')]),
uuid: Type.String(),
network: Type.Union([Type.Literal('mainnet'), Type.Literal('testnet')]),
predicate: Type.Any(),
status: PredicateStatusSchema,
enabled: Type.Boolean(),
});
export type SerializedPredicate = Static<typeof SerializedPredicateSchema>;

export const SerializedPredicateResponseSchema = Type.Union([
Type.Object({
status: Type.Literal(404),
}),
Type.Object({
result: SerializedPredicateSchema,
status: Type.Literal(200),
}),
]);
export type SerializedPredicateResponse = Static<typeof SerializedPredicateResponseSchema>;
114 changes: 77 additions & 37 deletions components/client/typescript/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ import { request } from 'undici';
import { logger, PINO_CONFIG } from './util/logger';
import { timeout } from './util/helpers';
import { Payload, PayloadSchema } from './schemas/payload';
import { Predicate, PredicateHeaderSchema, ThenThatHttpPost } from './schemas/predicate';
import {
Predicate,
PredicateHeaderSchema,
SerializedPredicate,
SerializedPredicateResponse,
ThenThatHttpPost,
} from './schemas/predicate';
import { BitcoinIfThisOptionsSchema, BitcoinIfThisSchema } from './schemas/bitcoin/if_this';
import { StacksIfThisOptionsSchema, StacksIfThisSchema } from './schemas/stacks/if_this';

Expand Down Expand Up @@ -104,9 +110,7 @@ export async function buildServer(
callback: OnEventCallback
) {
async function waitForNode(this: FastifyInstance) {
logger.info(
`ChainhookEventObserver connecting to chainhook node at ${chainhookOpts.base_url}...`
);
logger.info(`ChainhookEventObserver looking for chainhook node at ${chainhookOpts.base_url}`);
while (true) {
try {
await request(`${chainhookOpts.base_url}/ping`, { method: 'GET', throwOnError: true });
Expand All @@ -118,16 +122,61 @@ export async function buildServer(
}
}

async function registerPredicates(this: FastifyInstance) {
async function isPredicateActive(predicate: ServerPredicate): Promise<boolean | undefined> {
try {
const result = await request(`${chainhookOpts.base_url}/v1/chainhooks/${predicate.uuid}`, {
method: 'GET',
headers: { accept: 'application/json' },
throwOnError: true,
});
const response = (await result.body.json()) as SerializedPredicateResponse;
if (response.status == 404) return undefined;
if (
response.result.enabled == false ||
response.result.status.type == 'interrupted' ||
response.result.status.type == 'unconfirmed_expiration' ||
response.result.status.type == 'confirmed_expiration'
) {
return false;
}
return true;
} catch (error) {
logger.error(
error,
`ChainhookEventObserver unable to check if predicate ${predicate.uuid} is active`
);
return false;
}
}

async function registerAllPredicates(this: FastifyInstance) {
logger.info(predicates, `ChainhookEventObserver connected to ${chainhookOpts.base_url}`);
if (predicates.length === 0) {
logger.info(`ChainhookEventObserver does not have predicates to register`);
return;
}
const nodeType = serverOpts.node_type ?? 'chainhook';
const path = nodeType === 'chainhook' ? `/v1/chainhooks` : `/v1/observers`;
const registerUrl = `${chainhookOpts.base_url}${path}`;
logger.info(predicates, `ChainhookEventObserver registering predicates at ${registerUrl}`);
for (const predicate of predicates) {
if (nodeType === 'chainhook') {
switch (await isPredicateActive(predicate)) {
case undefined:
// Predicate doesn't exist.
break;
case true:
logger.info(
`ChainhookEventObserver predicate ${predicate.uuid} is already active, skipping registration`
);
continue;
case false:
logger.info(
`ChainhookEventObserver predicate ${predicate.uuid} was being used but is now inactive, updating`
rafaelcr marked this conversation as resolved.
Show resolved Hide resolved
);
await removePredicate(predicate);
}
}
logger.info(`ChainhookEventObserver registering predicate ${predicate.uuid}`);
const thenThat: ThenThatHttpPost = {
http_post: {
url: `${serverOpts.external_base_url}/payload`,
Expand All @@ -144,46 +193,37 @@ export async function buildServer(
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(
`ChainhookEventObserver registered '${predicate.name}' predicate (${predicate.uuid})`
);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to register predicate`);
}
}
}

async function removePredicates(this: FastifyInstance) {
async function removePredicate(predicate: ServerPredicate): Promise<void> {
const nodeType = serverOpts.node_type ?? 'chainhook';
const path =
nodeType === 'chainhook'
? `/v1/chainhooks/${predicate.chain}/${encodeURIComponent(predicate.uuid)}`
: `/v1/observers/${encodeURIComponent(predicate.uuid)}`;
try {
await request(`${chainhookOpts.base_url}${path}`, {
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
});
logger.info(`ChainhookEventObserver removed predicate ${predicate.uuid}`);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
}
}

async function removeAllPredicates(this: FastifyInstance) {
if (predicates.length === 0) {
logger.info(`ChainhookEventObserver does not have predicates to close`);
return;
}
logger.info(`ChainhookEventObserver closing predicates at ${chainhookOpts.base_url}`);
const nodeType = serverOpts.node_type ?? 'chainhook';
const removals = predicates.map(
predicate =>
new Promise<void>((resolve, reject) => {
const path =
nodeType === 'chainhook'
? `/v1/chainhooks/${predicate.chain}/${encodeURIComponent(predicate.uuid)}`
: `/v1/observers/${encodeURIComponent(predicate.uuid)}`;
request(`${chainhookOpts.base_url}${path}`, {
method: 'DELETE',
headers: { 'content-type': 'application/json' },
throwOnError: true,
})
.then(() => {
logger.info(
`ChainhookEventObserver removed '${predicate.name}' predicate (${predicate.uuid})`
);
resolve();
})
.catch(error => {
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
reject(error);
});
})
);
const removals = predicates.map(predicate => removePredicate(predicate));
await Promise.allSettled(removals);
}

Expand Down Expand Up @@ -242,8 +282,8 @@ export async function buildServer(
if (serverOpts.wait_for_chainhook_node ?? true) {
fastify.addHook('onReady', waitForNode);
}
fastify.addHook('onReady', registerPredicates);
fastify.addHook('onClose', removePredicates);
fastify.addHook('onReady', registerAllPredicates);
fastify.addHook('onClose', removeAllPredicates);

await fastify.register(ChainhookEventObserver);
return fastify;
Expand Down
Loading