Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
  • Loading branch information
samchungy committed Apr 3, 2024
1 parent 7ccf0f5 commit 2cbc92b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 25 deletions.
43 changes: 20 additions & 23 deletions template/lambda-sqs-worker/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import 'skuba-dive/register';

import type { SQSEvent } from 'aws-lambda';
import type { SQSEvent, SQSBatchResponse } from 'aws-lambda';

import { createHandler } from 'src/framework/handler';
import { createBatchSQSHandler, createHandler } from 'src/framework/handler';
import { logger } from 'src/framework/logging';
import { metricsClient } from 'src/framework/metrics';
import { validateJson } from 'src/framework/validation';
Expand All @@ -17,27 +17,8 @@ const smokeTest = async () => {
await Promise.all([scoringService.smokeTest(), sendPipelineEvent({}, true)]);
};

export const handler = createHandler<SQSEvent>(async (event) => {
// Treat an empty object as our smoke test event.
if (!Object.keys(event).length) {
logger.debug('Received smoke test request');
return smokeTest();
}

const count = event.Records.length;

if (count !== 1) {
throw Error(`Received ${count} records`);
}

logger.debug({ count }, 'Received jobs');

metricsClient.distribution('job.received', event.Records.length);

const record = event.Records[0];
if (!record) {
throw new Error('Malformed SQS event with no records');
}
const recordHandler = createBatchSQSHandler(async (record, _ctx) => {
metricsClient.distribution('job.received', 1);

const { body } = record;

Expand All @@ -55,3 +36,19 @@ export const handler = createHandler<SQSEvent>(async (event) => {

metricsClient.distribution('job.scored', 1);
});

export const handler = createHandler<SQSEvent>(
async (event, ctx): Promise<SQSBatchResponse> => {
// Treat an empty object as our smoke test event.
if (!Object.keys(event).length) {
logger.debug('Received smoke test request');
return smokeTest();
}

const count = event.Records.length;

logger.debug({ count }, 'Received jobs');

return recordHandler(event, ctx);
},
);
41 changes: 39 additions & 2 deletions template/lambda-sqs-worker/src/framework/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { datadog } from 'datadog-lambda-js';

import {
SQSRecord,
SQSEvent,
SQSBatchResponse,
SQSBatchItemFailure,
} from 'aws-lambda';
import { config } from 'src/config';
import { logger, loggerContext } from 'src/framework/logging';

Expand All @@ -24,7 +29,7 @@ const withDatadog = <Event, Output = unknown>(
config.metrics ? (datadog(fn) as Handler<Event, Output>) : fn;

export const createHandler = <Event, Output = unknown>(
fn: (event: Event) => Promise<Output>,
fn: (event: Event, ctx: LambdaContext) => Promise<Output>,
) =>
withDatadog<Event>((event, { awsRequestId }) =>
loggerContext.run({ awsRequestId }, async () => {
Expand All @@ -41,3 +46,35 @@ export const createHandler = <Event, Output = unknown>(
}
}),
);

export const createBatchSQSHandler =
(
fn: (record: SQSRecord, ctx: LambdaContext) => Promise<unknown>,
): Handler<SQSEvent, SQSBatchResponse> =>
async (event, ctx) => {
const processRecord = async (
record: SQSRecord,
): Promise<SQSBatchItemFailure | undefined> =>
loggerContext.run(
{ awsRequestId: ctx.awsRequestId, messageId: record.messageId },
async () => {
try {
await fn(record, ctx);
return;
} catch (err) {
logger.error({ err }, 'Processing record failed');
return {
itemIdentifier: record.messageId,
};
}
},
);

const results = await Promise.all(event.Records.map(processRecord));

return {
batchItemFailures: results.filter((item): item is SQSBatchItemFailure =>
Boolean(item),
),
};
};
1 change: 1 addition & 0 deletions template/lambda-sqs-worker/src/framework/logging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { config } from 'src/config';

interface LoggerContext {
awsRequestId: string;
messageId: string;
}

export const loggerContext = new AsyncLocalStorage<LoggerContext>();
Expand Down

0 comments on commit 2cbc92b

Please sign in to comment.