Skip to content

Commit 48484f3

Browse files
CCM-12858: race condicion fix
1 parent fb8c994 commit 48484f3

1 file changed

Lines changed: 91 additions & 50 deletions

File tree

lambdas/core-notifier-lambda/src/apis/sqs-handler.ts

Lines changed: 91 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ type EventToPublish = {
3939
rejected?: MessageRequestRejected;
4040
};
4141

42+
type ProcessRecordResult = {
43+
submitted?: MessageRequestSubmitted;
44+
skipped?: MessageRequestSkipped;
45+
rejected?: MessageRequestRejected;
46+
batchItemFailure?: SQSBatchItemFailure;
47+
};
48+
4249
async function handlePdmResourceAvailable(
4350
notifyMessageProcessor: NotifyMessageProcessor,
4451
incoming: PDMResourceAvailable,
@@ -69,6 +76,65 @@ async function handlePdmResourceAvailable(
6976
return eventToPublish;
7077
}
7178

79+
async function processSqsRecord(
80+
sqsRecord: SQSRecord,
81+
logger: Logger,
82+
senderManagement: ISenderManagement,
83+
notifyMessageProcessor: NotifyMessageProcessor,
84+
): Promise<ProcessRecordResult> {
85+
const result: ProcessRecordResult = {};
86+
87+
let incoming: PDMResourceAvailable | undefined;
88+
let sender: Sender | null = null;
89+
90+
try {
91+
incoming = parseSqsRecord(sqsRecord, logger);
92+
sender = await senderManagement.getSender({
93+
senderId: incoming.data.senderId,
94+
});
95+
96+
if (sender === null) {
97+
throw new Error(
98+
`Sender not found for senderId: ${incoming.data.senderId}`,
99+
);
100+
}
101+
102+
const eventToPublish = await handlePdmResourceAvailable(
103+
notifyMessageProcessor,
104+
incoming,
105+
sender,
106+
);
107+
108+
if (eventToPublish.submitted) {
109+
result.submitted = eventToPublish.submitted;
110+
}
111+
if (eventToPublish.skipped) {
112+
result.skipped = eventToPublish.skipped;
113+
}
114+
} catch (error: any) {
115+
logger.warn({
116+
error: error.message,
117+
description: 'Failed processing message',
118+
messageId: sqsRecord.messageId,
119+
senderId: incoming?.data.senderId,
120+
});
121+
122+
if (error instanceof RequestNotifyError && incoming && sender) {
123+
// CCM-12858 A/C 5: When any other response other than a 201 is returned, don't retry the message
124+
result.rejected = mapPdmEventToMessageRequestRejected(
125+
incoming,
126+
sender,
127+
error.errorCode,
128+
);
129+
} else {
130+
// this might be a transient error so we notify the queue to retry
131+
result.batchItemFailure = { itemIdentifier: sqsRecord.messageId };
132+
}
133+
}
134+
135+
return result;
136+
}
137+
72138
export const createHandler = ({
73139
eventPublisher,
74140
logger,
@@ -87,58 +153,33 @@ export const createHandler = ({
87153
const submittedEvents: MessageRequestSubmitted[] = [];
88154
const rejectedEvents: MessageRequestRejected[] = [];
89155

90-
let incoming: PDMResourceAvailable;
91-
let sender: Sender | null;
92-
93-
await Promise.all(
94-
sqsEvent.Records.map(async (sqsRecord: SQSRecord) => {
95-
try {
96-
incoming = parseSqsRecord(sqsRecord, logger);
97-
sender = await senderManagement.getSender({
98-
senderId: incoming.data.senderId,
99-
});
100-
if (sender === null) {
101-
throw new Error(
102-
`Sender not found for senderId: ${incoming.data.senderId}`,
103-
);
104-
}
105-
106-
const eventToPublish = await handlePdmResourceAvailable(
107-
notifyMessageProcessor,
108-
incoming,
109-
sender,
110-
);
111-
112-
if (eventToPublish.submitted) {
113-
submittedEvents.push(eventToPublish.submitted);
114-
}
115-
if (eventToPublish.skipped) {
116-
skippedEvents.push(eventToPublish.skipped);
117-
}
118-
} catch (error: any) {
119-
logger.warn({
120-
error: error.message,
121-
description: 'Failed processing message',
122-
messageId: sqsRecord.messageId,
123-
senderId: incoming?.data.senderId,
124-
});
125-
if (error instanceof RequestNotifyError) {
126-
// CCM-12858 A/C 5: When any other response other than a 201 is returned, don't retry the message
127-
rejectedEvents.push(
128-
mapPdmEventToMessageRequestRejected(
129-
incoming,
130-
sender!,
131-
error.errorCode,
132-
),
133-
);
134-
} else {
135-
// this might be a transient error so we notify the queue to retry
136-
batchItemFailures.push({ itemIdentifier: sqsRecord.messageId });
137-
}
138-
}
139-
}),
156+
const results = await Promise.all(
157+
sqsEvent.Records.map((sqsRecord) =>
158+
processSqsRecord(
159+
sqsRecord,
160+
logger,
161+
senderManagement,
162+
notifyMessageProcessor,
163+
),
164+
),
140165
);
141166

167+
// Aggregate results from all processed records
168+
for (const result of results) {
169+
if (result.submitted) {
170+
submittedEvents.push(result.submitted);
171+
}
172+
if (result.skipped) {
173+
skippedEvents.push(result.skipped);
174+
}
175+
if (result.rejected) {
176+
rejectedEvents.push(result.rejected);
177+
}
178+
if (result.batchItemFailure) {
179+
batchItemFailures.push(result.batchItemFailure);
180+
}
181+
}
182+
142183
await Promise.all(
143184
[
144185
submittedEvents.length > 0 &&

0 commit comments

Comments
 (0)