Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 91 additions & 50 deletions lambdas/core-notifier-lambda/src/apis/sqs-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type EventToPublish = {
rejected?: MessageRequestRejected;
};

type ProcessRecordResult = {
submitted?: MessageRequestSubmitted;
skipped?: MessageRequestSkipped;
rejected?: MessageRequestRejected;
batchItemFailure?: SQSBatchItemFailure;
};

async function handlePdmResourceAvailable(
notifyMessageProcessor: NotifyMessageProcessor,
incoming: PDMResourceAvailable,
Expand Down Expand Up @@ -69,6 +76,65 @@ async function handlePdmResourceAvailable(
return eventToPublish;
}

async function processSqsRecord(
sqsRecord: SQSRecord,
logger: Logger,
senderManagement: ISenderManagement,
notifyMessageProcessor: NotifyMessageProcessor,
): Promise<ProcessRecordResult> {
const result: ProcessRecordResult = {};

let incoming: PDMResourceAvailable | undefined;
let sender: Sender | null = null;

try {
incoming = parseSqsRecord(sqsRecord, logger);
sender = await senderManagement.getSender({
senderId: incoming.data.senderId,
});

if (sender === null) {
throw new Error(
`Sender not found for senderId: ${incoming.data.senderId}`,
);
}

const eventToPublish = await handlePdmResourceAvailable(
notifyMessageProcessor,
incoming,
sender,
);

if (eventToPublish.submitted) {
result.submitted = eventToPublish.submitted;
}
if (eventToPublish.skipped) {
result.skipped = eventToPublish.skipped;
}
} catch (error: any) {
logger.warn({
error: error.message,
description: 'Failed processing message',
messageId: sqsRecord.messageId,
senderId: incoming?.data.senderId,
});

if (error instanceof RequestNotifyError && incoming && sender) {
// CCM-12858 A/C 5: When any other response other than a 201 is returned, don't retry the message
result.rejected = mapPdmEventToMessageRequestRejected(
incoming,
sender,
error.errorCode,
);
} else {
// this might be a transient error so we notify the queue to retry
result.batchItemFailure = { itemIdentifier: sqsRecord.messageId };
}
}

return result;
}

export const createHandler = ({
eventPublisher,
logger,
Expand All @@ -87,58 +153,33 @@ export const createHandler = ({
const submittedEvents: MessageRequestSubmitted[] = [];
const rejectedEvents: MessageRequestRejected[] = [];

let incoming: PDMResourceAvailable;
let sender: Sender | null;

await Promise.all(
sqsEvent.Records.map(async (sqsRecord: SQSRecord) => {
try {
incoming = parseSqsRecord(sqsRecord, logger);
sender = await senderManagement.getSender({
senderId: incoming.data.senderId,
});
if (sender === null) {
throw new Error(
`Sender not found for senderId: ${incoming.data.senderId}`,
);
}

const eventToPublish = await handlePdmResourceAvailable(
notifyMessageProcessor,
incoming,
sender,
);

if (eventToPublish.submitted) {
submittedEvents.push(eventToPublish.submitted);
}
if (eventToPublish.skipped) {
skippedEvents.push(eventToPublish.skipped);
}
} catch (error: any) {
logger.warn({
error: error.message,
description: 'Failed processing message',
messageId: sqsRecord.messageId,
senderId: incoming?.data.senderId,
});
if (error instanceof RequestNotifyError) {
// CCM-12858 A/C 5: When any other response other than a 201 is returned, don't retry the message
rejectedEvents.push(
mapPdmEventToMessageRequestRejected(
incoming,
sender!,
error.errorCode,
),
);
} else {
// this might be a transient error so we notify the queue to retry
batchItemFailures.push({ itemIdentifier: sqsRecord.messageId });
}
}
}),
const results = await Promise.all(
sqsEvent.Records.map((sqsRecord) =>
processSqsRecord(
sqsRecord,
logger,
senderManagement,
notifyMessageProcessor,
),
),
);

// Aggregate results from all processed records
for (const result of results) {
if (result.submitted) {
submittedEvents.push(result.submitted);
}
if (result.skipped) {
skippedEvents.push(result.skipped);
}
if (result.rejected) {
rejectedEvents.push(result.rejected);
}
if (result.batchItemFailure) {
batchItemFailures.push(result.batchItemFailure);
}
}

await Promise.all(
[
submittedEvents.length > 0 &&
Expand Down
Comment thread
nhsd-angel-pastor marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ MarkupSafe==2.1.3
pip==25.3
setuptools==78.1.1
Werkzeug==3.1.4
wheel==0.41.1
wheel==0.46.2
WTForms==3.0.1
Loading