diff --git a/lambdas/core-notifier-lambda/src/apis/sqs-handler.ts b/lambdas/core-notifier-lambda/src/apis/sqs-handler.ts index 82ab8fc4c..eb217bab0 100644 --- a/lambdas/core-notifier-lambda/src/apis/sqs-handler.ts +++ b/lambdas/core-notifier-lambda/src/apis/sqs-handler.ts @@ -39,6 +39,13 @@ type EventToPublish = { rejected?: MessageRequestRejected; }; +type ProcessRecordResult = { + submitted?: MessageRequestSubmitted; + skipped?: MessageRequestSkipped; + rejected?: MessageRequestRejected; + batchItemFailure?: SQSBatchItemFailure; +}; + async function handlePdmResourceAvailable( notifyMessageProcessor: NotifyMessageProcessor, incoming: PDMResourceAvailable, @@ -69,6 +76,65 @@ async function handlePdmResourceAvailable( return eventToPublish; } +async function processSqsRecord( + sqsRecord: SQSRecord, + logger: Logger, + senderManagement: ISenderManagement, + notifyMessageProcessor: NotifyMessageProcessor, +): Promise { + const result: ProcessRecordResult = {}; + + let incoming: PDMResourceAvailable | undefined; + let sender: Sender | null = null; + + try { + incoming = parseSqsRecord(sqsRecord, logger); + sender = await senderManagement.getSender({ + senderId: incoming.data.senderId, + }); + + if (sender === null) { + throw new Error( + `Sender not found for senderId: ${incoming.data.senderId}`, + ); + } + + const eventToPublish = await handlePdmResourceAvailable( + notifyMessageProcessor, + incoming, + sender, + ); + + if (eventToPublish.submitted) { + result.submitted = eventToPublish.submitted; + } + if (eventToPublish.skipped) { + result.skipped = eventToPublish.skipped; + } + } catch (error: any) { + logger.warn({ + error: error.message, + description: 'Failed processing message', + messageId: sqsRecord.messageId, + senderId: incoming?.data.senderId, + }); + + if (error instanceof RequestNotifyError && incoming && sender) { + // CCM-12858 A/C 5: When any other response other than a 201 is returned, don't retry the message + result.rejected = mapPdmEventToMessageRequestRejected( + incoming, + sender, + error.errorCode, + ); + } else { + // this might be a transient error so we notify the queue to retry + result.batchItemFailure = { itemIdentifier: sqsRecord.messageId }; + } + } + + return result; +} + export const createHandler = ({ eventPublisher, logger, @@ -87,58 +153,33 @@ export const createHandler = ({ const submittedEvents: MessageRequestSubmitted[] = []; const rejectedEvents: MessageRequestRejected[] = []; - let incoming: PDMResourceAvailable; - let sender: Sender | null; - - await Promise.all( - sqsEvent.Records.map(async (sqsRecord: SQSRecord) => { - try { - incoming = parseSqsRecord(sqsRecord, logger); - sender = await senderManagement.getSender({ - senderId: incoming.data.senderId, - }); - if (sender === null) { - throw new Error( - `Sender not found for senderId: ${incoming.data.senderId}`, - ); - } - - const eventToPublish = await handlePdmResourceAvailable( - notifyMessageProcessor, - incoming, - sender, - ); - - if (eventToPublish.submitted) { - submittedEvents.push(eventToPublish.submitted); - } - if (eventToPublish.skipped) { - skippedEvents.push(eventToPublish.skipped); - } - } catch (error: any) { - logger.warn({ - error: error.message, - description: 'Failed processing message', - messageId: sqsRecord.messageId, - senderId: incoming?.data.senderId, - }); - if (error instanceof RequestNotifyError) { - // CCM-12858 A/C 5: When any other response other than a 201 is returned, don't retry the message - rejectedEvents.push( - mapPdmEventToMessageRequestRejected( - incoming, - sender!, - error.errorCode, - ), - ); - } else { - // this might be a transient error so we notify the queue to retry - batchItemFailures.push({ itemIdentifier: sqsRecord.messageId }); - } - } - }), + const results = await Promise.all( + sqsEvent.Records.map((sqsRecord) => + processSqsRecord( + sqsRecord, + logger, + senderManagement, + notifyMessageProcessor, + ), + ), ); + // Aggregate results from all processed records + for (const result of results) { + if (result.submitted) { + submittedEvents.push(result.submitted); + } + if (result.skipped) { + skippedEvents.push(result.skipped); + } + if (result.rejected) { + rejectedEvents.push(result.rejected); + } + if (result.batchItemFailure) { + batchItemFailures.push(result.batchItemFailure); + } + } + await Promise.all( [ submittedEvents.length > 0 && diff --git a/scripts/docker/examples/python/assets/hello_world/requirements.txt b/scripts/docker/examples/python/assets/hello_world/requirements.txt index 1921a4285..fd7fb3118 100644 --- a/scripts/docker/examples/python/assets/hello_world/requirements.txt +++ b/scripts/docker/examples/python/assets/hello_world/requirements.txt @@ -8,5 +8,5 @@ MarkupSafe==2.1.3 pip==25.3 setuptools==78.1.1 Werkzeug==3.1.4 -wheel==0.41.1 +wheel==0.46.2 WTForms==3.0.1