-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathProcessNemsUpdate.cs
More file actions
241 lines (209 loc) · 9.61 KB
/
ProcessNemsUpdate.cs
File metadata and controls
241 lines (209 loc) · 9.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
namespace NHS.Screening.ProcessNemsUpdate;
using System.Collections.Concurrent;
using System.Collections.Specialized;
using System.Net.Http.Json;
using System.Text;
using System.Text.Json;
using Common;
using Common.Interfaces;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Model;
using DataServices.Client;
using System.Net;
public class ProcessNemsUpdate
{
private readonly ILogger<ProcessNemsUpdate> _logger;
private readonly IFhirPatientDemographicMapper _fhirPatientDemographicMapper;
private readonly ICreateBasicParticipantData _createBasicParticipantData;
private readonly IAddBatchToQueue _addBatchToQueue;
private readonly IHttpClientFunction _httpClientFunction;
private readonly IExceptionHandler _exceptionHandler;
private readonly IDataServiceClient<ParticipantDemographic> _participantDemographic;
private readonly IBlobStorageHelper _blobStorageHelper;
private readonly ProcessNemsUpdateConfig _config;
private long nhsNumberLong;
public ProcessNemsUpdate(
ILogger<ProcessNemsUpdate> logger,
IFhirPatientDemographicMapper fhirPatientDemographicMapper,
ICreateBasicParticipantData createBasicParticipantData,
IAddBatchToQueue addBatchToQueue,
IHttpClientFunction httpClientFunction,
IExceptionHandler exceptionHandler,
IDataServiceClient<ParticipantDemographic> participantDemographic,
IOptions<ProcessNemsUpdateConfig> processNemsUpdateConfig,
IBlobStorageHelper blobStorageHelper)
{
_logger = logger;
_fhirPatientDemographicMapper = fhirPatientDemographicMapper;
_createBasicParticipantData = createBasicParticipantData;
_addBatchToQueue = addBatchToQueue;
_httpClientFunction = httpClientFunction;
_exceptionHandler = exceptionHandler;
_participantDemographic = participantDemographic;
_config = processNemsUpdateConfig.Value;
_blobStorageHelper = blobStorageHelper;
}
/// <summary>
/// Function that processes files from the nems-updates blob container. There are a number of stages to this function:
/// 1) Parse the NHS number from the received file.
/// 2) Use the parsed NHS number to retrieve the PDS record.
/// 3) Compare the retrieved PDS record NHS number against the parsed NHS number.
/// 4) If the NHS numbers match, add the PDS record onto the correct participant management queue.
/// 5) If the NHS numbers do not match, build the required superseded record, then add this record onto the correct participant management queue.
/// 6) Also if the NHS numbers do not match, unsubscribe the parsed NHS number from NEMS.
/// </summary>
/// <returns>
/// This function returns nothing, only logs information/errors for successful or failing tasks.
/// </returns>
[Function(nameof(ProcessNemsUpdate))]
public async Task Run([BlobTrigger("nems-updates/{name}", Connection = "nemsmeshfolder_STORAGE")] Stream blobStream, string name)
{
try
{
var nhsNumber = await GetNhsNumberFromFile(blobStream, name);
if (nhsNumber == null)
{
_logger.LogInformation("There is no NHS number, unable to continue.");
throw new InvalidDataException("No NHS number found"); // Force poison container
}
if (!ValidationHelper.ValidateNHSNumber(nhsNumber))
{
_logger.LogError("There was a problem parsing the NHS number from blob store in the ProcessNemsUpdate function");
throw new InvalidDataException("Invalid NHS Number");
}
nhsNumberLong = long.Parse(nhsNumber!);
var pdsResponse = await RetrievePdsRecord(nhsNumber);
if (pdsResponse!.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogError("the PDS function has returned a 404 error for file {FileName}. Moving file to poison container.", name);
await CopyToPoisonContainer(name);
return;
}
pdsResponse.EnsureSuccessStatusCode();
var retrievedPdsRecord = await pdsResponse.Content.ReadFromJsonAsync<PdsDemographic>();
if (retrievedPdsRecord?.NhsNumber == nhsNumber)
{
_logger.LogInformation("NHS numbers match, processing the retrieved PDS record.");
await ProcessRecord(new Participant(retrievedPdsRecord!));
}
else
{
await UnsubscribeFromNems(nhsNumber, retrievedPdsRecord!);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "There was an error processing NEMS update for file {FileName}. Moving to poison container.", name);
try
{
await CopyToPoisonContainer(name);
}
catch (Exception poisonEx)
{
_logger.LogError(poisonEx, "Failed to copy NEMS file {FileName} to poison container. Manual intervention required.", name);
}
}
}
private async Task CopyToPoisonContainer(string fileName)
{
await _blobStorageHelper.CopyFileToPoisonAsync(_config.nemsmeshfolder_STORAGE, fileName, _config.NemsMessages, _config.NemsPoisonContainer, addTimestamp: true);
_logger.LogInformation("Copied failed NEMS file {FileName} to poison container with timestamp.", fileName);
}
private async Task UnsubscribeFromNems(string nhsNumber, PdsDemographic retrievedPdsRecord)
{
var supersededRecord = new PdsDemographic()
{
NhsNumber = nhsNumber,
SupersededByNhsNumber = retrievedPdsRecord?.NhsNumber,
PrimaryCareProvider = null,
ReasonForRemoval = "ORR",
RemovalEffectiveFromDate = DateTime.UtcNow.Date.ToString("yyyyMMdd")
};
_logger.LogInformation("NHS numbers do not match, processing the superseded record.");
await ProcessRecord(new Participant(supersededRecord));
/*information exception raised for RuleId 60 and Rule name 'SupersededNhsNumber'*/
var ruleId = 60; // Rule 60 is for Superseded rule
var ruleName = "SupersededNhsNumber"; //Superseded rule name
await _exceptionHandler.CreateTransformExecutedExceptions(supersededRecord.ToCohortDistributionParticipant(), ruleName, ruleId);
var unsubscribedFromNems = await UnsubscribeNems(nhsNumber);
if (unsubscribedFromNems)
{
_logger.LogInformation("Successfully unsubscribed from NEMS.");
}
}
private async Task<string?> GetNhsNumberFromFile(Stream blobStream, string name)
{
try
{
_logger.LogInformation("Downloading file from the blob, file: {Name}.", name);
string blobJson;
using (var reader = new StreamReader(blobStream, Encoding.UTF8))
{
blobJson = await reader.ReadToEndAsync();
}
// Determine format based on file extension and call appropriate parser
if (name.EndsWith(".xml", System.StringComparison.OrdinalIgnoreCase))
{
return _fhirPatientDemographicMapper.ParseFhirXmlNhsNumber(blobJson);
}
else
{
return _fhirPatientDemographicMapper.ParseFhirJsonNhsNumber(blobJson);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "There was an error getting the NHS number from the file.");
return null;
}
}
private async Task<HttpResponseMessage> RetrievePdsRecord(string nhsNumber)
{
var queryParams = new Dictionary<string, string>()
{
{"nhsNumber", nhsNumber }
};
return await _httpClientFunction.SendGetResponse(_config.RetrievePdsDemographicURL, queryParams);
}
private async Task ProcessRecord(Participant participant)
{
var updateRecord = new ConcurrentQueue<BasicParticipantCsvRecord>();
// TODO validate all dates in record before enqueuing
var existingParticipant = await _participantDemographic.GetSingleByFilter(x => x.NhsNumber == nhsNumberLong);
if (existingParticipant == null)
{
participant.RecordType = Actions.New;
_logger.LogWarning("The participant doesn't exists in Cohort Manager.A new record will be created in Cohort Manager.");
}
else
{
participant.RecordType = Actions.Amended;
_logger.LogWarning("The participant already exists in Cohort Manager. Existing record will get updated.");
}
var basicParticipantCsvRecord = new BasicParticipantCsvRecord
{
BasicParticipantData = _createBasicParticipantData.BasicParticipantData(participant),
FileName = "NemsMessages",
Participant = participant
};
updateRecord.Enqueue(basicParticipantCsvRecord);
_logger.LogInformation("Sending record to the update queue.");
await _addBatchToQueue.ProcessBatch(updateRecord, _config.ParticipantManagementTopic);
}
private async Task<bool> UnsubscribeNems(string nhsNumber)
{
try
{
var data = new NameValueCollection { { "NhsNumber", nhsNumber } };
var response = await _httpClientFunction.SendPost(_config.UnsubscribeNemsSubscriptionUrl, JsonSerializer.Serialize(data));
return response.IsSuccessStatusCode;
}
catch (Exception ex)
{
_logger.LogError(ex, "There was an error unsubscribing from NEMS.");
return false;
}
}
}