22
33using System . Collections . Concurrent ;
44using System . Collections . Specialized ;
5+ using System . Net . Http . Json ;
56using System . Text ;
67using System . Text . Json ;
78using Common ;
1112using Microsoft . Extensions . Options ;
1213using Model ;
1314using DataServices . Client ;
15+ using FluentValidation . Validators ;
1416
1517public class ProcessNemsUpdate
1618{
@@ -61,61 +63,35 @@ public async Task Run([BlobTrigger("nems-updates/{name}", Connection = "nemsmesh
6163 {
6264 try
6365 {
64- string ? nhsNumber = await GetNhsNumberFromFile ( blobStream , name ) ;
66+ var nhsNumber = await GetNhsNumberFromFile ( blobStream , name ) ;
6567
66- if ( nhsNumber == null )
67- {
68- _logger . LogInformation ( "There is no NHS number, unable to continue." ) ;
69- return ;
70- }
71-
72- if ( ! ValidationHelper . ValidateNHSNumber ( nhsNumber ) )
68+ if ( ! ValidationHelper . ValidateNHSNumber ( nhsNumber ! ) )
7369 {
70+ _logger . LogError ( "There was a problem parsing the NHS number from blob store in the ProcessNemsUpdate function" ) ;
7471 throw new InvalidDataException ( "Invalid NHS Number" ) ;
7572 }
76- nhsNumberLong = long . Parse ( nhsNumber ) ;
77-
78- string ? pdsRecord = await RetrievePdsRecord ( nhsNumber ) ;
73+ nhsNumberLong = long . Parse ( nhsNumber ! ) ;
7974
80- if ( pdsRecord == null )
75+ var pdsResponse = await RetrievePdsRecord ( nhsNumber ! ) ;
76+ if ( pdsResponse ! . StatusCode == System . Net . HttpStatusCode . NotFound )
8177 {
82- _logger . LogInformation ( "There is no PDS record, unable to continue." ) ;
78+ await ProcessPdsResponse ( pdsResponse , nhsNumber ! ) ;
79+ // we can stop processing here as we know that not found means the participant ether needed an update or they were actually not found
8380 return ;
8481 }
8582
86- var retrievedPdsRecord = JsonSerializer . Deserialize < PdsDemographic > ( pdsRecord ) ;
83+ pdsResponse . EnsureSuccessStatusCode ( ) ;
84+
85+ var retrievedPdsRecord = await pdsResponse . Content . ReadFromJsonAsync < PdsDemographic > ( ) ;
8786
8887 if ( retrievedPdsRecord ? . NhsNumber == nhsNumber )
8988 {
9089 _logger . LogInformation ( "NHS numbers match, processing the retrieved PDS record." ) ;
91- await ProcessRecord ( retrievedPdsRecord ) ;
90+ await ProcessRecord ( new Participant ( retrievedPdsRecord ! ) ) ;
9291 }
93-
9492 else
9593 {
96- var supersededRecord = new PdsDemographic ( )
97- {
98- NhsNumber = nhsNumber ,
99- SupersededByNhsNumber = retrievedPdsRecord ? . NhsNumber ,
100- PrimaryCareProvider = null ,
101- ReasonForRemoval = "ORR" ,
102- RemovalEffectiveFromDate = DateTime . UtcNow . Date . ToString ( "yyyyMMdd" )
103- } ;
104-
105- _logger . LogInformation ( "NHS numbers do not match, processing the superseded record." ) ;
106- await ProcessRecord ( supersededRecord ) ;
107-
108- /*information exception raised for RuleId 60 and Rule name 'SupersededNhsNumber'*/
109- var ruleId = 60 ; // Rule 60 is for Superseded rule
110- var ruleName = "SupersededNhsNumber" ; //Superseded rule name
111- await _exceptionHandler . CreateTransformExecutedExceptions ( supersededRecord . ToCohortDistributionParticipant ( ) , ruleName , ruleId ) ;
112-
113- var unsubscribedFromNems = await UnsubscribeNems ( nhsNumber ) ;
114-
115- if ( unsubscribedFromNems )
116- {
117- _logger . LogInformation ( "Successfully unsubscribed from NEMS." ) ;
118- }
94+ await UnsubscribeFromNems ( nhsNumber ! , retrievedPdsRecord ! ) ;
11995 }
12096
12197 }
@@ -126,6 +102,56 @@ public async Task Run([BlobTrigger("nems-updates/{name}", Connection = "nemsmesh
126102
127103 }
128104
105+ private async Task ProcessPdsResponse ( HttpResponseMessage pdsResponse , string nhsNumber )
106+ {
107+ var errorResponse = await pdsResponse ! . Content . ReadFromJsonAsync < PdsErrorResponse > ( ) ;
108+ // we now create a record as an update record and send to the manage participant function. Reason for removal for date should be today and the reason for remove of ORR
109+ if ( errorResponse ! . issue ! . FirstOrDefault ( ) ! . details ! . coding ! . FirstOrDefault ( ) ! . code == "INVALIDATED_RESOURCE" )
110+ {
111+ var pdsDemographic = new PdsDemographic ( )
112+ {
113+ NhsNumber = nhsNumber ,
114+ PrimaryCareProvider = null ,
115+ ReasonForRemoval = "ORR" ,
116+ RemovalEffectiveFromDate = DateTime . UtcNow . Date . ToString ( "yyyyMMdd" )
117+ } ;
118+ var participant = new Participant ( pdsDemographic ) ;
119+ participant . RecordType = Actions . Removed ;
120+ //sends record for an update
121+ await ProcessRecord ( participant ) ;
122+ return ;
123+ }
124+ _logger . LogError ( "the PDS function has returned a 404 error. function now stopping processing" ) ;
125+
126+ }
127+
128+ private async Task UnsubscribeFromNems ( string nhsNumber , PdsDemographic retrievedPdsRecord )
129+ {
130+ var supersededRecord = new PdsDemographic ( )
131+ {
132+ NhsNumber = nhsNumber ,
133+ SupersededByNhsNumber = retrievedPdsRecord ? . NhsNumber ,
134+ PrimaryCareProvider = null ,
135+ ReasonForRemoval = "ORR" ,
136+ RemovalEffectiveFromDate = DateTime . UtcNow . Date . ToString ( "yyyyMMdd" )
137+ } ;
138+
139+ _logger . LogInformation ( "NHS numbers do not match, processing the superseded record." ) ;
140+ await ProcessRecord ( new Participant ( supersededRecord ) ) ;
141+
142+ /*information exception raised for RuleId 60 and Rule name 'SupersededNhsNumber'*/
143+ var ruleId = 60 ; // Rule 60 is for Superseded rule
144+ var ruleName = "SupersededNhsNumber" ; //Superseded rule name
145+ await _exceptionHandler . CreateTransformExecutedExceptions ( supersededRecord . ToCohortDistributionParticipant ( ) , ruleName , ruleId ) ;
146+
147+ var unsubscribedFromNems = await UnsubscribeNems ( nhsNumber ) ;
148+
149+ if ( unsubscribedFromNems )
150+ {
151+ _logger . LogInformation ( "Successfully unsubscribed from NEMS." ) ;
152+ }
153+ }
154+
129155 private async Task < string ? > GetNhsNumberFromFile ( Stream blobStream , string name )
130156 {
131157 try
@@ -139,7 +165,7 @@ public async Task Run([BlobTrigger("nems-updates/{name}", Connection = "nemsmesh
139165 }
140166
141167 // Determine format based on file extension and call appropriate parser
142- if ( name . EndsWith ( ".xml" , StringComparison . OrdinalIgnoreCase ) )
168+ if ( name . EndsWith ( ".xml" , System . StringComparison . OrdinalIgnoreCase ) )
143169 {
144170 return _fhirPatientDemographicMapper . ParseFhirXmlNhsNumber ( blobJson ) ;
145171 }
@@ -156,41 +182,32 @@ public async Task Run([BlobTrigger("nems-updates/{name}", Connection = "nemsmesh
156182 }
157183 }
158184
159- private async Task < string ? > RetrievePdsRecord ( string nhsNumber )
185+ private async Task < HttpResponseMessage > RetrievePdsRecord ( string nhsNumber )
160186 {
161- try
187+ var queryParams = new Dictionary < string , string > ( )
162188 {
163- var queryParams = new Dictionary < string , string > ( )
164- {
165- { "nhsNumber" , nhsNumber }
166- } ;
189+ { "nhsNumber" , nhsNumber }
190+ } ;
167191
168- return await _httpClientFunction . SendGet ( _config . RetrievePdsDemographicURL , queryParams ) ;
169- }
170- catch ( Exception ex )
171- {
172- _logger . LogError ( ex , "There was an error retrieving the PDS record." ) ;
173- return null ;
174- }
192+ return await _httpClientFunction . SendGetHttpResponse ( _config . RetrievePdsDemographicURL , queryParams ) ;
175193 }
176194
177- private async Task ProcessRecord ( PdsDemographic pdsDemographic )
195+ private async Task ProcessRecord ( Participant participant )
178196 {
179197 var updateRecord = new ConcurrentQueue < BasicParticipantCsvRecord > ( ) ;
180198
181- var participant = new Participant ( pdsDemographic ) ;
182-
183199 // TODO validate all dates in record before enqueuing
184-
185200 var existingParticipant = await _participantDemographic . GetSingleByFilter ( x => x . NhsNumber == nhsNumberLong ) ;
201+
202+
186203 if ( existingParticipant == null )
187204 {
188205 participant . RecordType = Actions . New ;
189206 _logger . LogWarning ( "The participant doesn't exists in Cohort Manager.A new record will be created in Cohort Manager." ) ;
190207 }
191208 else
192209 {
193- participant . RecordType = Actions . Amended ;
210+ participant . RecordType = participant . RecordType != Actions . Removed ? Actions . Amended : participant . RecordType ;
194211 _logger . LogWarning ( "The participant already exists in Cohort Manager. Existing record will get updated." ) ;
195212 }
196213
@@ -204,6 +221,7 @@ private async Task ProcessRecord(PdsDemographic pdsDemographic)
204221 updateRecord . Enqueue ( basicParticipantCsvRecord ) ;
205222
206223 _logger . LogInformation ( "Sending record to the update queue." ) ;
224+
207225 await _addBatchToQueue . ProcessBatch ( updateRecord , _config . ParticipantManagementTopic ) ;
208226 }
209227
0 commit comments