Skip to content

Commit

Permalink
Merge pull request #30 from guardian/sbd-add-dynamodb
Browse files Browse the repository at this point in the history
Add DynamoDB table and use it to store transcription data within output-handler lambda
  • Loading branch information
srbd committed Feb 13, 2024
2 parents 5f7f748 + 60493c0 commit ef75b99
Show file tree
Hide file tree
Showing 11 changed files with 821 additions and 308 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ node_modules/
build
**.js
localstack/
**/package-lock.json
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
environment:
# LocalStack configuration: https://docs.localstack.cloud/references/configuration/
- DEBUG=${DEBUG:-0}
- SERVICES=sqs,sns
- SERVICES=sqs,sns,dynamodb
- PERSISTENCE=1
volumes:
- '${LOCALSTACK_VOLUME_DIR:-./localstack}:/var/lib/localstack'
Expand Down
991 changes: 689 additions & 302 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"api::package": "npm run package --workspace api",
"api::start": "AWS_REGION=eu-west-1 STAGE=DEV npm run start --workspace api",
"output-handler::build": "npm run build --workspace output-handler",
"output-handler::start": "npm run start --workspace output-handler",
"output-handler::start": "AWS_REGION=eu-west-1 STAGE=DEV npm run start --workspace output-handler",
"worker::build": "npm run build --workspace worker; npm run build --workspace worker",
"worker::package": "npm run package --workspace worker",
"worker::start": "AWS_REGION=eu-west-1 STAGE=DEV npm run start --workspace worker",
Expand Down
4 changes: 3 additions & 1 deletion packages/backend-common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"dependencies": {
"@aws-sdk/client-ssm": "^3.496.0",
"@aws-sdk/client-sqs": "^3.496.0",
"@aws-sdk/client-s3": "^3.496.0"
"@aws-sdk/client-s3": "^3.496.0",
"@aws-sdk/client-dynamodb": "^3.509.0",
"@aws-sdk/lib-dynamodb": "^3.509.0"
},
"devDependencies": {},
"private": true,
Expand Down
4 changes: 4 additions & 0 deletions packages/backend-common/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface TranscriptionConfig {
destinationTopicArns: {
transcriptionService: string;
};
tableName: string;
};
aws: {
region: string;
Expand Down Expand Up @@ -102,6 +103,8 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {
'app/sourceMediaBucket',
);

const tableName = findParameter(parameters, paramPath, 'app/tableName');

return {
auth: {
clientId: authClientId,
Expand All @@ -117,6 +120,7 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {
destinationTopicArns: {
transcriptionService: destinationTopic,
},
tableName,
},
aws: {
region,
Expand Down
53 changes: 53 additions & 0 deletions packages/backend-common/src/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { PutCommand, DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';

import { z } from 'zod';

export const getDynamoClient = (
region: string,
localstackEndpoint?: string,
) => {
const clientBaseConfig = {
region,
};

const clientConfig = localstackEndpoint
? { ...clientBaseConfig, endpoint: localstackEndpoint }
: clientBaseConfig;

const client = new DynamoDBClient(clientConfig);
return DynamoDBDocumentClient.from(client);
};

export const Transcript = z.object({
srt: z.string(),
text: z.string(),
json: z.string(),
});
export const TranscriptionItem = z.object({
id: z.string(),
originalFilename: z.string(),
transcript: Transcript,
userEmail: z.string(),
});

export type TranscriptionItem = z.infer<typeof TranscriptionItem>;

export const writeTranscriptionItem = async (
client: DynamoDBDocumentClient,
tableName: string,
item: TranscriptionItem,
) => {
const command = new PutCommand({
TableName: tableName,
Item: item,
});

try {
await client.send(command);
console.log(`saved to db item ${item.id}`);
} catch (error) {
console.error('error writing to db', error);
throw error;
}
};
13 changes: 13 additions & 0 deletions packages/cdk/lib/transcription-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
GroupMetrics,
SpotAllocationStrategy,
} from 'aws-cdk-lib/aws-autoscaling';
import { AttributeType, Table } from 'aws-cdk-lib/aws-dynamodb';
import {
InstanceClass,
InstanceSize,
Expand Down Expand Up @@ -356,6 +357,16 @@ export class TranscriptionService extends GuStack {
// allow worker to receive message from queue
transcriptionTaskQueue.grantConsumeMessages(transcriptionWorkerASG);

const transcriptTable = new Table(this, 'TranscriptTable', {
tableName: `${APP_NAME}-${this.stage}`,
partitionKey: {
name: 'id',
type: AttributeType.STRING,
},
readCapacity: 1,
writeCapacity: 1,
});

const outputHandlerLambda = new GuLambdaFunction(
this,
'transcription-service-output-handler',
Expand All @@ -367,6 +378,8 @@ export class TranscriptionService extends GuStack {
},
);

transcriptTable.grantReadWriteData(outputHandlerLambda);

const transcriptionOutputQueue = new Queue(
this,
`${APP_NAME}-output-queue`,
Expand Down
33 changes: 32 additions & 1 deletion packages/output-handler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ import { Handler } from 'aws-lambda';
import { sendEmail, getSESClient } from './ses';
import { IncomingSQSEvent } from './sqs-event-types';
import { getConfig } from '@guardian/transcription-service-backend-common';
import {
getDynamoClient,
TranscriptionItem,
writeTranscriptionItem,
} from '@guardian/transcription-service-backend-common/src/dynamodb';
import { testMessage } from '../test/testMessage';

const messageBody = (
transcriptId: string,
Expand All @@ -21,7 +27,7 @@ const messageBody = (
`;
};

const handler: Handler = async (event) => {
const processMessage = async (event: unknown) => {
const config = await getConfig();
const sesClient = getSESClient(config.aws.region);

Expand All @@ -33,6 +39,24 @@ const handler: Handler = async (event) => {

for (const record of parsedEvent.data.Records) {
const transcriptionOutput = record.body.Message;

const dynamoItem: TranscriptionItem = {
id: transcriptionOutput.id,
originalFilename: transcriptionOutput.originalFilename,
transcript: {
srt: transcriptionOutput.transcriptionSrt,
text: '',
json: '',
},
userEmail: transcriptionOutput.userEmail,
};

await writeTranscriptionItem(
getDynamoClient(config.aws.region, config.aws.localstackEndpoint),
config.app.tableName,
dynamoItem,
);

await sendEmail(
sesClient,
config.app.emailNotificationFromAddress,
Expand All @@ -46,8 +70,15 @@ const handler: Handler = async (event) => {
),
);
}
};

const handler: Handler = async (event) => {
await processMessage(event);
return 'Finished processing Event';
};

// when running locally bypass the handler
if (!process.env['AWS_EXECUTION_ENV']) {
processMessage(testMessage);
}
export { handler as outputHandler };
8 changes: 8 additions & 0 deletions packages/output-handler/test/testMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export const testMessage = {
Records: [
{
messageId: 'abc123',
body: '{\n "Type" : "Notification",\n "MessageId" : "message-id",\n "TopicArn" : "mytopicarn",\n "Message" : "{\\"id\\":\\"my-first-transcription\\",\\"transcriptionSrt\\":\\"1\\\\n00:00:00,000 --> 00:00:01,300\\\\n This is The Guardian.\\\\n\\\\n2\\\\n00:00:01,300 --> 00:00:06,300\\\\n [Music]\\\\n\\\\n\\",\\"languageCode\\":\\"en\\",\\"userEmail\\":\\"test@test.com\\",\\"originalFilename\\":\\"test.mp3\\"}",\n "Timestamp" : "2024-02-08T11:10:11.014Z"\n}',
},
],
};
18 changes: 16 additions & 2 deletions scripts/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set -e

SCRIPT_PATH=$( cd $(dirname $0) ; pwd -P )
APP_NAME="transcription-service"

npm install

Expand All @@ -23,12 +24,25 @@ fi
# Starting localstack
docker-compose up -d
# If the queue already exists this command appears to still work and returns the existing queue url
QUEUE_URL=$(aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=transcription-service-task-queue-DEV.fifo --attributes "FifoQueue=true,ContentBasedDeduplication=true" | jq .QueueUrl)
QUEUE_URL=$(aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=$APP_NAME-task-queue-DEV.fifo --attributes "FifoQueue=true,ContentBasedDeduplication=true" | jq .QueueUrl)
# We don't install the localstack dns so need to replace the endpoint with localhost
QUEUE_URL_LOCALHOST=${QUEUE_URL/sqs.eu-west-1.localhost.localstack.cloud/localhost}

echo "Created queue in localstack, url: ${QUEUE_URL_LOCALHOST}"

TOPIC_ARN=$(aws --endpoint-url=http://localhost:4566 sns create-topic --name transcription-service-destination-topic-DEV | jq .TopicArn)
TOPIC_ARN=$(aws --endpoint-url=http://localhost:4566 sns create-topic --name $APP_NAME-destination-topic-DEV | jq .TopicArn)

echo "Created topic in localstack, arn: ${TOPIC_ARN}"

DYNAMODB_ARN=$(aws --endpoint-url=http://localhost:4566 dynamodb create-table \
--table-name ${APP_NAME}-DEV \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--attribute-definitions AttributeName=id,AttributeType=S \
--key-schema AttributeName=id,KeyType=HASH | jq .TableDescription.TableArn)

echo "Created table, arn: ${DYNAMODB_ARN}"





0 comments on commit ef75b99

Please sign in to comment.