From 3104db9d3144c41054acc4477cf24e0e4318835f Mon Sep 17 00:00:00 2001 From: philmcmahon Date: Fri, 8 Mar 2024 17:25:22 +0000 Subject: [PATCH 1/7] Remove SNS - instead publish directly to SQS --- packages/backend-common/src/sqs.ts | 17 ++++++++- packages/cdk/lib/transcription-service.ts | 22 +++--------- packages/worker/src/index.ts | 27 ++++---------- packages/worker/src/sns.ts | 44 ----------------------- 4 files changed, 28 insertions(+), 82 deletions(-) delete mode 100644 packages/worker/src/sns.ts diff --git a/packages/backend-common/src/sqs.ts b/packages/backend-common/src/sqs.ts index 28b94de2..b8ae01d1 100644 --- a/packages/backend-common/src/sqs.ts +++ b/packages/backend-common/src/sqs.ts @@ -11,6 +11,7 @@ import { DestinationService, TranscriptionJob, LanguageCode, + TranscriptionOutput, } from '@guardian/transcription-service-common'; import { getSignedUploadUrl } from '@guardian/transcription-service-backend-common'; import { logger } from '@guardian/transcription-service-backend-common'; @@ -94,12 +95,18 @@ const sendMessage = async ( messageBody: string, id: string, ): Promise => { + const fifo = queueUrl.includes('.fifo'); + const fifoProperties = fifo + ? { + MessageGroupId: id, + } + : {}; try { const result = await client.send( new SendMessageCommand({ QueueUrl: queueUrl, MessageBody: messageBody, - MessageGroupId: id, + ...fifoProperties, }), ); logger.info(`Message sent. Message id: ${result.MessageId}`); @@ -124,6 +131,14 @@ const sendMessage = async ( } }; +export const publishTranscriptionOutput = async ( + client: SQSClient, + queueUrl: string, + output: TranscriptionOutput, +) => { + await sendMessage(client, queueUrl, JSON.stringify(output), output.id); +}; + export const changeMessageVisibility = async ( client: SQSClient, queueUrl: string, diff --git a/packages/cdk/lib/transcription-service.ts b/packages/cdk/lib/transcription-service.ts index 0ff97ad7..da2c8d43 100644 --- a/packages/cdk/lib/transcription-service.ts +++ b/packages/cdk/lib/transcription-service.ts @@ -63,7 +63,6 @@ import { Runtime } from 'aws-cdk-lib/aws-lambda'; import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; import { HttpMethods } from 'aws-cdk-lib/aws-s3'; import { Topic } from 'aws-cdk-lib/aws-sns'; -import { SqsSubscription } from 'aws-cdk-lib/aws-sns-subscriptions'; import { Queue } from 'aws-cdk-lib/aws-sqs'; export class TranscriptionService extends GuStack { @@ -243,11 +242,11 @@ export class TranscriptionService extends GuStack { }); // worker output infrastructure - const transcriptDestinationTopic = new Topic( + const transcriptionOutputQueue = new Queue( this, - 'TranscriptDestinationTopic', + `${APP_NAME}-output-queue`, { - topicName: `transcription-service-destination-topic-${props.stage}`, + queueName: `${APP_NAME}-output-queue-${this.stage}`, }, ); @@ -282,8 +281,8 @@ export class TranscriptionService extends GuStack { statements: [getParametersPolicy], }), new GuAllowPolicy(this, 'WriteToDestinationTopic', { - actions: ['sns:Publish'], - resources: [transcriptDestinationTopic.topicArn], + actions: ['sqs:SendMessage'], + resources: [transcriptionOutputQueue.queueArn], }), new GuAllowPolicy(this, 'WriteToELK', { actions: [ @@ -494,17 +493,6 @@ export class TranscriptionService extends GuStack { transcriptTable.grantReadWriteData(outputHandlerLambda); transcriptTable.grantReadWriteData(apiLambda); - const transcriptionOutputQueue = new Queue( - this, - `${APP_NAME}-output-queue`, - { - queueName: `${APP_NAME}-output-queue-${this.stage}`, - }, - ); - transcriptDestinationTopic.addSubscription( - new SqsSubscription(transcriptionOutputQueue), - ); - // trigger output-handler lambda from queue outputHandlerLambda.addEventSource( new SqsEventSource(transcriptionOutputQueue), diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index b0ef91bb..1d46b1c9 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -10,6 +10,7 @@ import { TranscriptionConfig, moveMessageToDeadLetterQueue, logger, + publishTranscriptionOutput, } from '@guardian/transcription-service-backend-common'; import { OutputBucketKeys, @@ -17,7 +18,6 @@ import { TranscriptionOutputFailure, type TranscriptionOutputSuccess, } from '@guardian/transcription-service-common'; -import { getSNSClient, publishTranscriptionOutput } from './sns'; import { getTranscriptionText, convertToWav, @@ -32,7 +32,6 @@ import { FailureMetric, } from '@guardian/transcription-service-backend-common/src/metrics'; import { SQSClient } from '@aws-sdk/client-sqs'; -import { SNSClient } from '@aws-sdk/client-sns'; import { setTimeout } from 'timers/promises'; import { MAX_RECEIVE_COUNT } from '@guardian/transcription-service-common'; import { checkSpotInterrupt } from './spot-termination'; @@ -59,11 +58,6 @@ const main = async () => { config.aws.localstackEndpoint, ); - const snsClient = getSNSClient( - config.aws.region, - config.aws.localstackEndpoint, - ); - if (config.app.stage !== 'DEV') { // start job to regularly check the instance interruption checkSpotInterrupt(sqsClient, config.app.taskQueueUrl); @@ -73,19 +67,13 @@ const main = async () => { // keep polling unless instance is scheduled for termination while (!INTERRUPTION_TIME) { pollCount += 1; - await pollTranscriptionQueue( - pollCount, - sqsClient, - snsClient, - metrics, - config, - ); + await pollTranscriptionQueue(pollCount, sqsClient, metrics, config); await setTimeout(POLLING_INTERVAL_SECONDS * 1000); } }; const publishTranscriptionOutputFailure = async ( - snsClient: SNSClient, + sqsClient: SQSClient, destination: string, job: TranscriptionJob, ) => { @@ -97,7 +85,7 @@ const publishTranscriptionOutputFailure = async ( originalFilename: job.originalFilename, }; try { - await publishTranscriptionOutput(snsClient, destination, failureMessage); + await publishTranscriptionOutput(sqsClient, destination, failureMessage); } catch (e) { logger.error('error publishing transcription output failed', e); } @@ -106,7 +94,6 @@ const publishTranscriptionOutputFailure = async ( const pollTranscriptionQueue = async ( pollCount: number, sqsClient: SQSClient, - snsClient: SNSClient, metrics: MetricsService, config: TranscriptionConfig, ) => { @@ -208,7 +195,7 @@ const pollTranscriptionQueue = async ( logger.info('skip moving message to dead letter queue in DEV'); } await publishTranscriptionOutputFailure( - snsClient, + sqsClient, config.app.destinationTopicArns.transcriptionService, job, ); @@ -270,7 +257,7 @@ const pollTranscriptionQueue = async ( }; await publishTranscriptionOutput( - snsClient, + sqsClient, config.app.destinationTopicArns.transcriptionService, transcriptionOutput, ); @@ -309,7 +296,7 @@ const pollTranscriptionQueue = async ( ); if (receiveCount >= MAX_RECEIVE_COUNT) { publishTranscriptionOutputFailure( - snsClient, + sqsClient, config.app.destinationTopicArns.transcriptionService, job, ); diff --git a/packages/worker/src/sns.ts b/packages/worker/src/sns.ts deleted file mode 100644 index 942b3101..00000000 --- a/packages/worker/src/sns.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { PublishCommand, SNSClient } from '@aws-sdk/client-sns'; -import { logger } from '@guardian/transcription-service-backend-common/src/logging'; -import { TranscriptionOutput } from '@guardian/transcription-service-common'; - -export const getSNSClient = (region: string, localstackEndpoint?: string) => { - const clientBaseConfig = { - region, - }; - const clientConfig = localstackEndpoint - ? { ...clientBaseConfig, endpoint: localstackEndpoint } - : clientBaseConfig; - - return new SNSClient(clientConfig); -}; - -const publishMessage = async ( - client: SNSClient, - topicArn: string, - message: string, -): Promise => { - try { - const resp = await client.send( - new PublishCommand({ - TopicArn: topicArn, - Message: message, - }), - ); - logger.info( - `message sent with id ${resp.MessageId}, status: ${resp.$metadata.httpStatusCode}`, - ); - return resp.MessageId; - } catch (e) { - logger.error('Error publishing message', e); - throw e; - } -}; - -export const publishTranscriptionOutput = async ( - client: SNSClient, - topicArn: string, - output: TranscriptionOutput, -) => { - await publishMessage(client, topicArn, JSON.stringify(output)); -}; From bde8048110cf9ef37513d937bdb1344b4fe38e22 Mon Sep 17 00:00:00 2001 From: philmcmahon Date: Tue, 12 Mar 2024 12:21:23 +0000 Subject: [PATCH 2/7] Update config property --- packages/backend-common/src/config.ts | 6 +++--- packages/worker/src/index.ts | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/backend-common/src/config.ts b/packages/backend-common/src/config.ts index 9f9c6c26..b8ad4592 100644 --- a/packages/backend-common/src/config.ts +++ b/packages/backend-common/src/config.ts @@ -16,7 +16,7 @@ export interface TranscriptionConfig { emailNotificationFromAddress: string; sourceMediaBucket: string; transcriptionOutputBucket: string; - destinationTopicArns: { + destinationQueueArns: { transcriptionService: string; }; tableName: string; @@ -79,7 +79,7 @@ export const getConfig = async (): Promise => { const destinationTopic = findParameter( parameters, paramPath, - 'destinationTopicArns/transcriptionService', + 'destinationQueueArns/transcriptionService', ); // AWS clients take an optional 'endpoint' property that is only needed by localstack - on code/prod you don't need // to set it. Here we inder the endpoint (http://localhost:4566) from the sqs url @@ -131,7 +131,7 @@ export const getConfig = async (): Promise => { stage, sourceMediaBucket, emailNotificationFromAddress, - destinationTopicArns: { + destinationQueueArns: { transcriptionService: destinationTopic, }, tableName, diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 1d46b1c9..2f7f3c5e 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -196,7 +196,7 @@ const pollTranscriptionQueue = async ( } await publishTranscriptionOutputFailure( sqsClient, - config.app.destinationTopicArns.transcriptionService, + config.app.destinationQueueArns.transcriptionService, job, ); return; @@ -258,7 +258,7 @@ const pollTranscriptionQueue = async ( await publishTranscriptionOutput( sqsClient, - config.app.destinationTopicArns.transcriptionService, + config.app.destinationQueueArns.transcriptionService, transcriptionOutput, ); @@ -295,9 +295,9 @@ const pollTranscriptionQueue = async ( taskMessage.Attributes?.ApproximateReceiveCount || defaultReceiveCount, ); if (receiveCount >= MAX_RECEIVE_COUNT) { - publishTranscriptionOutputFailure( + await publishTranscriptionOutputFailure( sqsClient, - config.app.destinationTopicArns.transcriptionService, + config.app.destinationQueueArns.transcriptionService, job, ); } From 8cc69dc917298cf596a9db3110e271c723f8438e Mon Sep 17 00:00:00 2001 From: philmcmahon Date: Tue, 12 Mar 2024 12:33:39 +0000 Subject: [PATCH 3/7] Update config, localstack to reflect removed queue --- packages/backend-common/src/config.ts | 6 +++--- packages/worker/src/index.ts | 9 +++++---- scripts/setup.sh | 9 ++++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/packages/backend-common/src/config.ts b/packages/backend-common/src/config.ts index b8ad4592..892cc0dd 100644 --- a/packages/backend-common/src/config.ts +++ b/packages/backend-common/src/config.ts @@ -16,7 +16,7 @@ export interface TranscriptionConfig { emailNotificationFromAddress: string; sourceMediaBucket: string; transcriptionOutputBucket: string; - destinationQueueArns: { + destinationQueueUrls: { transcriptionService: string; }; tableName: string; @@ -79,7 +79,7 @@ export const getConfig = async (): Promise => { const destinationTopic = findParameter( parameters, paramPath, - 'destinationQueueArns/transcriptionService', + 'destinationQueueUrls/transcriptionService', ); // AWS clients take an optional 'endpoint' property that is only needed by localstack - on code/prod you don't need // to set it. Here we inder the endpoint (http://localhost:4566) from the sqs url @@ -131,7 +131,7 @@ export const getConfig = async (): Promise => { stage, sourceMediaBucket, emailNotificationFromAddress, - destinationQueueArns: { + destinationQueueUrls: { transcriptionService: destinationTopic, }, tableName, diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 2f7f3c5e..919cb29f 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -59,7 +59,8 @@ const main = async () => { ); if (config.app.stage !== 'DEV') { - // start job to regularly check the instance interruption + // start job to regularly check the instance interruption (Note: deliberately not using await here so the job + // runs in the background) checkSpotInterrupt(sqsClient, config.app.taskQueueUrl); } @@ -196,7 +197,7 @@ const pollTranscriptionQueue = async ( } await publishTranscriptionOutputFailure( sqsClient, - config.app.destinationQueueArns.transcriptionService, + config.app.destinationQueueUrls.transcriptionService, job, ); return; @@ -258,7 +259,7 @@ const pollTranscriptionQueue = async ( await publishTranscriptionOutput( sqsClient, - config.app.destinationQueueArns.transcriptionService, + config.app.destinationQueueUrls.transcriptionService, transcriptionOutput, ); @@ -297,7 +298,7 @@ const pollTranscriptionQueue = async ( if (receiveCount >= MAX_RECEIVE_COUNT) { await publishTranscriptionOutputFailure( sqsClient, - config.app.destinationQueueArns.transcriptionService, + config.app.destinationQueueUrls.transcriptionService, job, ); } diff --git a/scripts/setup.sh b/scripts/setup.sh index cc363c38..73b556df 100755 --- a/scripts/setup.sh +++ b/scripts/setup.sh @@ -2,7 +2,7 @@ set -e SCRIPT_PATH=$( cd $(dirname $0) ; pwd -P ) -APP_NAME="transcription-service" + npm install @@ -23,6 +23,7 @@ fi # Starting localstack docker-compose up -d +APP_NAME="transcription-service" # If the queue already exists this command appears to still work and returns the existing queue url QUEUE_URL=$(aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=$APP_NAME-task-queue-DEV.fifo --attributes "FifoQueue=true,ContentBasedDeduplication=true" | jq .QueueUrl) # We don't install the localstack dns so need to replace the endpoint with localhost @@ -30,9 +31,11 @@ QUEUE_URL_LOCALHOST=${QUEUE_URL/sqs.eu-west-1.localhost.localstack.cloud/localho echo "Created queue in localstack, url: ${QUEUE_URL_LOCALHOST}" -TOPIC_ARN=$(aws --endpoint-url=http://localhost:4566 sns create-topic --name $APP_NAME-destination-topic-DEV | jq .TopicArn) +OUTPUT_QUEUE_URL=$(aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=$APP_NAME-output-queue-DEV | jq .QueueUrl) +# We don't install the localstack dns so need to replace the endpoint with localhost +OUTPUT_QUEUE_URL_LOCALHOST=${OUTPUT_QUEUE_URL/sqs.eu-west-1.localhost.localstack.cloud/localhost} -echo "Created topic in localstack, arn: ${TOPIC_ARN}" +echo "Created queue in localstack, url: ${OUTPUT_QUEUE_URL_LOCALHOST}" DYNAMODB_ARN=$(aws --endpoint-url=http://localhost:4566 dynamodb create-table \ --table-name ${APP_NAME}-DEV \ From 40e3d0a60686fb743ddc5c8dad75dbd4f2798e98 Mon Sep 17 00:00:00 2001 From: philmcmahon Date: Tue, 12 Mar 2024 12:48:30 +0000 Subject: [PATCH 4/7] Remove excess logging --- packages/backend-common/src/configHelpers.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/backend-common/src/configHelpers.ts b/packages/backend-common/src/configHelpers.ts index 4b0d4624..072300d7 100644 --- a/packages/backend-common/src/configHelpers.ts +++ b/packages/backend-common/src/configHelpers.ts @@ -28,7 +28,6 @@ export const getParameters = async ( } while (nextToken); if (parameters) { - logger.info('Fetched parameters from Parameter Store'); return parameters; } else { throw new Error('No parameters fetched from Parameter Store'); From 0060c08365c268d8577d91ce2a4f939c25a822a7 Mon Sep 17 00:00:00 2001 From: philmcmahon Date: Tue, 12 Mar 2024 19:08:10 +0000 Subject: [PATCH 5/7] Log out full event on parsing failure --- packages/output-handler/src/index.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/output-handler/src/index.ts b/packages/output-handler/src/index.ts index 47808359..9bd64d2d 100644 --- a/packages/output-handler/src/index.ts +++ b/packages/output-handler/src/index.ts @@ -132,7 +132,10 @@ const processMessage = async (event: unknown) => { const parsedEvent = IncomingSQSEvent.safeParse(event); if (!parsedEvent.success) { - logger.error(`Failed to parse SQS message ${parsedEvent.error.message}`); + logger.error( + `Failed to parse SQS message ${parsedEvent.error.message}`, + event, + ); throw new Error('Failed to parse SQS message'); } From ba92230b5ebf14a10893559df7b2a7bed36ff45b Mon Sep 17 00:00:00 2001 From: philmcmahon Date: Thu, 14 Mar 2024 11:34:36 +0000 Subject: [PATCH 6/7] more event --- packages/output-handler/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/output-handler/src/index.ts b/packages/output-handler/src/index.ts index 9bd64d2d..50492783 100644 --- a/packages/output-handler/src/index.ts +++ b/packages/output-handler/src/index.ts @@ -133,7 +133,7 @@ const processMessage = async (event: unknown) => { const parsedEvent = IncomingSQSEvent.safeParse(event); if (!parsedEvent.success) { logger.error( - `Failed to parse SQS message ${parsedEvent.error.message}`, + `Failed to parse SQS message ${parsedEvent.error.message} + ${JSON.stringify(event)}`, event, ); throw new Error('Failed to parse SQS message'); From 0b85e34852f6e99b4310c88260df602c2a117ef5 Mon Sep 17 00:00:00 2001 From: philmcmahon Date: Thu, 14 Mar 2024 18:08:42 +0000 Subject: [PATCH 7/7] Adjust types now that there's no SNS --- packages/output-handler/src/index.ts | 2 +- packages/output-handler/src/sqs-event-types.ts | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/packages/output-handler/src/index.ts b/packages/output-handler/src/index.ts index 50492783..732700be 100644 --- a/packages/output-handler/src/index.ts +++ b/packages/output-handler/src/index.ts @@ -140,7 +140,7 @@ const processMessage = async (event: unknown) => { } for (const record of parsedEvent.data.Records) { - const transcriptionOutput = record.body.Message; + const transcriptionOutput = record.body; if (transcriptionOutputIsSuccess(transcriptionOutput)) { await handleTranscriptionSuccess( config, diff --git a/packages/output-handler/src/sqs-event-types.ts b/packages/output-handler/src/sqs-event-types.ts index 06994312..c9da0b2e 100644 --- a/packages/output-handler/src/sqs-event-types.ts +++ b/packages/output-handler/src/sqs-event-types.ts @@ -3,15 +3,10 @@ import { stringToJSONSchema } from './zod-string-to-json'; import { TranscriptionOutput } from '@guardian/transcription-service-common'; const SQSMessageBody = z.object({ - MessageId: z.string(), - Timestamp: z.string(), - Message: stringToJSONSchema.pipe(TranscriptionOutput), + messageId: z.string(), + body: stringToJSONSchema.pipe(TranscriptionOutput), }); export const IncomingSQSEvent = z.object({ - Records: z.array( - z.object({ - body: stringToJSONSchema.pipe(SQSMessageBody), - }), - ), + Records: z.array(SQSMessageBody), });