Skip to content

Commit

Permalink
Merge pull request #68 from guardian/pm-remove-sns
Browse files Browse the repository at this point in the history
Remove SNS
  • Loading branch information
marjisound committed Mar 25, 2024
2 parents c79d88a + 0b85e34 commit c29ba23
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 104 deletions.
6 changes: 3 additions & 3 deletions packages/backend-common/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface TranscriptionConfig {
emailNotificationFromAddress: string;
sourceMediaBucket: string;
transcriptionOutputBucket: string;
destinationTopicArns: {
destinationQueueUrls: {
transcriptionService: string;
};
tableName: string;
Expand Down Expand Up @@ -79,7 +79,7 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {
const destinationTopic = findParameter(
parameters,
paramPath,
'destinationTopicArns/transcriptionService',
'destinationQueueUrls/transcriptionService',
);
// AWS clients take an optional 'endpoint' property that is only needed by localstack - on code/prod you don't need
// to set it. Here we inder the endpoint (http://localhost:4566) from the sqs url
Expand Down Expand Up @@ -131,7 +131,7 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {
stage,
sourceMediaBucket,
emailNotificationFromAddress,
destinationTopicArns: {
destinationQueueUrls: {
transcriptionService: destinationTopic,
},
tableName,
Expand Down
1 change: 0 additions & 1 deletion packages/backend-common/src/configHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ export const getParameters = async (
} while (nextToken);

if (parameters) {
logger.info('Fetched parameters from Parameter Store');
return parameters;
} else {
throw new Error('No parameters fetched from Parameter Store');
Expand Down
17 changes: 16 additions & 1 deletion packages/backend-common/src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
DestinationService,
TranscriptionJob,
LanguageCode,
TranscriptionOutput,
} from '@guardian/transcription-service-common';
import { getSignedUploadUrl } from '@guardian/transcription-service-backend-common';
import { logger } from '@guardian/transcription-service-backend-common';
Expand Down Expand Up @@ -94,12 +95,18 @@ const sendMessage = async (
messageBody: string,
id: string,
): Promise<SendResult> => {
const fifo = queueUrl.includes('.fifo');
const fifoProperties = fifo
? {
MessageGroupId: id,
}
: {};
try {
const result = await client.send(
new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: messageBody,
MessageGroupId: id,
...fifoProperties,
}),
);
logger.info(`Message sent. Message id: ${result.MessageId}`);
Expand All @@ -124,6 +131,14 @@ const sendMessage = async (
}
};

export const publishTranscriptionOutput = async (
client: SQSClient,
queueUrl: string,
output: TranscriptionOutput,
) => {
await sendMessage(client, queueUrl, JSON.stringify(output), output.id);
};

export const changeMessageVisibility = async (
client: SQSClient,
queueUrl: string,
Expand Down
22 changes: 5 additions & 17 deletions packages/cdk/lib/transcription-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import { Runtime } from 'aws-cdk-lib/aws-lambda';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { HttpMethods } from 'aws-cdk-lib/aws-s3';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { SqsSubscription } from 'aws-cdk-lib/aws-sns-subscriptions';
import { Queue } from 'aws-cdk-lib/aws-sqs';

export class TranscriptionService extends GuStack {
Expand Down Expand Up @@ -243,11 +242,11 @@ export class TranscriptionService extends GuStack {
});

// worker output infrastructure
const transcriptDestinationTopic = new Topic(
const transcriptionOutputQueue = new Queue(
this,
'TranscriptDestinationTopic',
`${APP_NAME}-output-queue`,
{
topicName: `transcription-service-destination-topic-${props.stage}`,
queueName: `${APP_NAME}-output-queue-${this.stage}`,
},
);

Expand Down Expand Up @@ -282,8 +281,8 @@ export class TranscriptionService extends GuStack {
statements: [getParametersPolicy],
}),
new GuAllowPolicy(this, 'WriteToDestinationTopic', {
actions: ['sns:Publish'],
resources: [transcriptDestinationTopic.topicArn],
actions: ['sqs:SendMessage'],
resources: [transcriptionOutputQueue.queueArn],
}),
new GuAllowPolicy(this, 'WriteToELK', {
actions: [
Expand Down Expand Up @@ -494,17 +493,6 @@ export class TranscriptionService extends GuStack {
transcriptTable.grantReadWriteData(outputHandlerLambda);
transcriptTable.grantReadWriteData(apiLambda);

const transcriptionOutputQueue = new Queue(
this,
`${APP_NAME}-output-queue`,
{
queueName: `${APP_NAME}-output-queue-${this.stage}`,
},
);
transcriptDestinationTopic.addSubscription(
new SqsSubscription(transcriptionOutputQueue),
);

// trigger output-handler lambda from queue
outputHandlerLambda.addEventSource(
new SqsEventSource(transcriptionOutputQueue),
Expand Down
7 changes: 5 additions & 2 deletions packages/output-handler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,15 @@ const processMessage = async (event: unknown) => {

const parsedEvent = IncomingSQSEvent.safeParse(event);
if (!parsedEvent.success) {
logger.error(`Failed to parse SQS message ${parsedEvent.error.message}`);
logger.error(
`Failed to parse SQS message ${parsedEvent.error.message} + ${JSON.stringify(event)}`,
event,
);
throw new Error('Failed to parse SQS message');
}

for (const record of parsedEvent.data.Records) {
const transcriptionOutput = record.body.Message;
const transcriptionOutput = record.body;
if (transcriptionOutputIsSuccess(transcriptionOutput)) {
await handleTranscriptionSuccess(
config,
Expand Down
11 changes: 3 additions & 8 deletions packages/output-handler/src/sqs-event-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@ import { stringToJSONSchema } from './zod-string-to-json';
import { TranscriptionOutput } from '@guardian/transcription-service-common';

const SQSMessageBody = z.object({
MessageId: z.string(),
Timestamp: z.string(),
Message: stringToJSONSchema.pipe(TranscriptionOutput),
messageId: z.string(),
body: stringToJSONSchema.pipe(TranscriptionOutput),
});

export const IncomingSQSEvent = z.object({
Records: z.array(
z.object({
body: stringToJSONSchema.pipe(SQSMessageBody),
}),
),
Records: z.array(SQSMessageBody),
});
38 changes: 13 additions & 25 deletions packages/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import {
TranscriptionConfig,
moveMessageToDeadLetterQueue,
logger,
publishTranscriptionOutput,
} from '@guardian/transcription-service-backend-common';
import {
OutputBucketKeys,
TranscriptionJob,
TranscriptionOutputFailure,
type TranscriptionOutputSuccess,
} from '@guardian/transcription-service-common';
import { getSNSClient, publishTranscriptionOutput } from './sns';
import {
getTranscriptionText,
convertToWav,
Expand All @@ -32,7 +32,6 @@ import {
FailureMetric,
} from '@guardian/transcription-service-backend-common/src/metrics';
import { SQSClient } from '@aws-sdk/client-sqs';
import { SNSClient } from '@aws-sdk/client-sns';
import { setTimeout } from 'timers/promises';
import { MAX_RECEIVE_COUNT } from '@guardian/transcription-service-common';
import { checkSpotInterrupt } from './spot-termination';
Expand All @@ -59,33 +58,23 @@ const main = async () => {
config.aws.localstackEndpoint,
);

const snsClient = getSNSClient(
config.aws.region,
config.aws.localstackEndpoint,
);

if (config.app.stage !== 'DEV') {
// start job to regularly check the instance interruption
// start job to regularly check the instance interruption (Note: deliberately not using await here so the job
// runs in the background)
checkSpotInterrupt(sqsClient, config.app.taskQueueUrl);
}

let pollCount = 0;
// keep polling unless instance is scheduled for termination
while (!INTERRUPTION_TIME) {
pollCount += 1;
await pollTranscriptionQueue(
pollCount,
sqsClient,
snsClient,
metrics,
config,
);
await pollTranscriptionQueue(pollCount, sqsClient, metrics, config);
await setTimeout(POLLING_INTERVAL_SECONDS * 1000);
}
};

const publishTranscriptionOutputFailure = async (
snsClient: SNSClient,
sqsClient: SQSClient,
destination: string,
job: TranscriptionJob,
) => {
Expand All @@ -97,7 +86,7 @@ const publishTranscriptionOutputFailure = async (
originalFilename: job.originalFilename,
};
try {
await publishTranscriptionOutput(snsClient, destination, failureMessage);
await publishTranscriptionOutput(sqsClient, destination, failureMessage);
} catch (e) {
logger.error('error publishing transcription output failed', e);
}
Expand All @@ -106,7 +95,6 @@ const publishTranscriptionOutputFailure = async (
const pollTranscriptionQueue = async (
pollCount: number,
sqsClient: SQSClient,
snsClient: SNSClient,
metrics: MetricsService,
config: TranscriptionConfig,
) => {
Expand Down Expand Up @@ -208,8 +196,8 @@ const pollTranscriptionQueue = async (
logger.info('skip moving message to dead letter queue in DEV');
}
await publishTranscriptionOutputFailure(
snsClient,
config.app.destinationTopicArns.transcriptionService,
sqsClient,
config.app.destinationQueueUrls.transcriptionService,
job,
);
return;
Expand Down Expand Up @@ -270,8 +258,8 @@ const pollTranscriptionQueue = async (
};

await publishTranscriptionOutput(
snsClient,
config.app.destinationTopicArns.transcriptionService,
sqsClient,
config.app.destinationQueueUrls.transcriptionService,
transcriptionOutput,
);

Expand Down Expand Up @@ -308,9 +296,9 @@ const pollTranscriptionQueue = async (
taskMessage.Attributes?.ApproximateReceiveCount || defaultReceiveCount,
);
if (receiveCount >= MAX_RECEIVE_COUNT) {
publishTranscriptionOutputFailure(
snsClient,
config.app.destinationTopicArns.transcriptionService,
await publishTranscriptionOutputFailure(
sqsClient,
config.app.destinationQueueUrls.transcriptionService,
job,
);
}
Expand Down
44 changes: 0 additions & 44 deletions packages/worker/src/sns.ts

This file was deleted.

9 changes: 6 additions & 3 deletions scripts/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -e

SCRIPT_PATH=$( cd $(dirname $0) ; pwd -P )
APP_NAME="transcription-service"


npm install

Expand All @@ -23,16 +23,19 @@ fi

# Starting localstack
docker-compose up -d
APP_NAME="transcription-service"
# If the queue already exists this command appears to still work and returns the existing queue url
QUEUE_URL=$(aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=$APP_NAME-task-queue-DEV.fifo --attributes "FifoQueue=true,ContentBasedDeduplication=true" | jq .QueueUrl)
# We don't install the localstack dns so need to replace the endpoint with localhost
QUEUE_URL_LOCALHOST=${QUEUE_URL/sqs.eu-west-1.localhost.localstack.cloud/localhost}

echo "Created queue in localstack, url: ${QUEUE_URL_LOCALHOST}"

TOPIC_ARN=$(aws --endpoint-url=http://localhost:4566 sns create-topic --name $APP_NAME-destination-topic-DEV | jq .TopicArn)
OUTPUT_QUEUE_URL=$(aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=$APP_NAME-output-queue-DEV | jq .QueueUrl)
# We don't install the localstack dns so need to replace the endpoint with localhost
OUTPUT_QUEUE_URL_LOCALHOST=${OUTPUT_QUEUE_URL/sqs.eu-west-1.localhost.localstack.cloud/localhost}

echo "Created topic in localstack, arn: ${TOPIC_ARN}"
echo "Created queue in localstack, url: ${OUTPUT_QUEUE_URL_LOCALHOST}"

DYNAMODB_ARN=$(aws --endpoint-url=http://localhost:4566 dynamodb create-table \
--table-name ${APP_NAME}-DEV \
Expand Down

0 comments on commit c29ba23

Please sign in to comment.