-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathProcessCaasFile.cs
More file actions
243 lines (214 loc) · 10.6 KB
/
ProcessCaasFile.cs
File metadata and controls
243 lines (214 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
namespace NHS.Screening.ReceiveCaasFile;
using System.Text.Json;
using Common;
using Common.Interfaces;
using Data.Database;
using DataServices.Client;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.IdentityModel.Protocols.Configuration;
using Model;
public class ProcessCaasFile : IProcessCaasFile
{
private readonly ILogger<ProcessCaasFile> _logger;
private readonly IReceiveCaasFileHelper _receiveCaasFileHelper;
private readonly ICheckDemographic _checkDemographic;
private readonly ICreateBasicParticipantData _createBasicParticipantData;
private readonly IAddBatchToQueue _addBatchToQueue;
private readonly IExceptionHandler _exceptionHandler;
private readonly IDataServiceClient<ParticipantDemographic> _participantDemographic;
private readonly IRecordsProcessedTracker _recordsProcessTracker;
private readonly IValidateDates _validateDates;
private readonly ICallFunction _callFunction;
private readonly ReceiveCaasFileConfig _config;
private readonly string DemographicURI;
private readonly string AddParticipantQueueName;
private readonly string UpdateParticipantQueueName;
public ProcessCaasFile(
ILogger<ProcessCaasFile> logger,
ICheckDemographic checkDemographic,
ICreateBasicParticipantData createBasicParticipantData,
IAddBatchToQueue addBatchToQueue,
IReceiveCaasFileHelper receiveCaasFileHelper,
IExceptionHandler exceptionHandler,
IDataServiceClient<ParticipantDemographic> participantDemographic,
IRecordsProcessedTracker recordsProcessedTracker,
IValidateDates validateDates,
ICallFunction callFunction,
IOptions<ReceiveCaasFileConfig> receiveCaasFileConfig
)
{
_logger = logger;
_checkDemographic = checkDemographic;
_createBasicParticipantData = createBasicParticipantData;
_addBatchToQueue = addBatchToQueue;
_receiveCaasFileHelper = receiveCaasFileHelper;
_exceptionHandler = exceptionHandler;
_participantDemographic = participantDemographic;
_recordsProcessTracker = recordsProcessedTracker;
_validateDates = validateDates;
_callFunction = callFunction;
_config = receiveCaasFileConfig.Value;
DemographicURI = _config.DemographicURI;
AddParticipantQueueName = _config.AddQueueName;
UpdateParticipantQueueName = _config.UpdateQueueName;
if (string.IsNullOrEmpty(DemographicURI) || string.IsNullOrEmpty(AddParticipantQueueName) || string.IsNullOrEmpty(UpdateParticipantQueueName))
{
_logger.LogError("Required environment variables DemographicURI and PMSUpdateParticipant are missing.");
throw new InvalidConfigurationException("Required environment variables DemographicURI and PMSUpdateParticipant are missing.");
}
}
/// <summary>
/// process a given batch and send it the queue
/// </summary>
/// <param name="values"></param>
/// <param name="options"></param>
/// <param name="screeningService"></param>
/// <param name="name"></param>
/// <returns></returns>
public async Task ProcessRecords(List<ParticipantsParquetMap> values, ParallelOptions options, ScreeningService screeningService, string name)
{
var currentBatch = new Batch();
await Parallel.ForEachAsync(values, options, async (rec, cancellationToken) =>
{
var participant = await _receiveCaasFileHelper.MapParticipant(rec, screeningService.ScreeningId, screeningService.ScreeningName, name);
if (participant == null)
{
return;
}
if (!ValidationHelper.ValidateNHSNumber(participant.NhsNumber))
{
await _exceptionHandler.CreateSystemExceptionLog(new Exception($"Invalid NHS Number was passed in for participant {participant} and file {name}"), participant, name);
return; // skip current participant
}
if (!_validateDates.ValidateAllDates(participant))
{
await _exceptionHandler.CreateSystemExceptionLog(new Exception($"Invalid effective date found in participant data {participant} and file name {name}"), participant, name);
return; // Skip current participant
}
if (!_recordsProcessTracker.RecordAlreadyProcessed(participant.RecordType, participant.NhsNumber))
{
await _exceptionHandler.CreateSystemExceptionLog(new Exception($"Duplicate Participant was in the file"), participant, name);
return; // Skip current participant
}
await AddRecordToBatch(participant, currentBatch, name);
});
if (await _checkDemographic.PostDemographicDataAsync(currentBatch.DemographicData.ToList(), DemographicURI))
{
await AddBatchToQueue(currentBatch, name);
}
}
/// <summary>
/// adds a given record to the current given batch
/// </summary>
/// <param name="participant"></param>
/// <param name="currentBatch"></param>
/// <param name="FileName"></param>
/// <returns></returns>
private async Task AddRecordToBatch(Participant participant, Batch currentBatch, string fileName)
{
var basicParticipantCsvRecord = new BasicParticipantCsvRecord
{
Participant = _createBasicParticipantData.BasicParticipantData(participant),
FileName = fileName,
participant = participant
};
// take note: we don't need to add DemographicData to the queue for update because we loop through all updates in the UpdateParticipant method
switch (participant.RecordType?.Trim())
{
case Actions.New:
await DeleteOldDemographicRecord(basicParticipantCsvRecord, fileName);
currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic());
currentBatch.AddRecords.Enqueue(basicParticipantCsvRecord);
break;
case Actions.Amended:
await DeleteOldDemographicRecord(basicParticipantCsvRecord, fileName);
currentBatch.DemographicData.Enqueue(participant.ToParticipantDemographic());
currentBatch.UpdateRecords.Enqueue(basicParticipantCsvRecord);
break;
case Actions.Removed:
currentBatch.DeleteRecords.Enqueue(basicParticipantCsvRecord);
break;
default:
await _exceptionHandler.CreateSchemaValidationException(basicParticipantCsvRecord, "RecordType was not set to an expected value");
break;
}
}
private async Task AddBatchToQueue(Batch currentBatch, string name)
{
_logger.LogInformation("sending {Count} records to Add queue", currentBatch.AddRecords.Count);
await _addBatchToQueue.ProcessBatch(currentBatch.AddRecords, AddParticipantQueueName);
_logger.LogInformation("sending Update Records {Count} to queue", currentBatch.UpdateRecords.Count);
await _addBatchToQueue.ProcessBatch(currentBatch.UpdateRecords, UpdateParticipantQueueName);
foreach (var updateRecords in currentBatch.DeleteRecords)
{
await RemoveParticipant(updateRecords, name);
}
// this used to release memory from being used
currentBatch = null;
}
private async Task DeleteOldDemographicRecord(BasicParticipantCsvRecord basicParticipantCsvRecord, string name)
{
try
{
long nhsNumber;
if (!long.TryParse(basicParticipantCsvRecord.participant.NhsNumber, out nhsNumber))
{
throw new FormatException("Unable to parse NHS Number");
}
var participant = await _participantDemographic.GetSingleByFilter(x => x.NhsNumber == nhsNumber);
if (participant != null)
{
var deleted = await _participantDemographic.Delete(participant.ParticipantId.ToString());
_logger.LogInformation(deleted ? "Deleting old Demographic record was successful" : "Deleting old Demographic record was not successful");
return;
}
else
{
_logger.LogWarning("The participant could not be found, when trying to delete old Participant. This could prevent updates from being applied");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Update participant function failed.\nMessage: {Message}\nStack Trace: {StackTrace}", ex.Message, ex.StackTrace);
await CreateError(basicParticipantCsvRecord.participant, name);
}
}
private async Task RemoveParticipant(BasicParticipantCsvRecord basicParticipantCsvRecord, string filename)
{
var allowDeleteRecords = (bool)DatabaseHelper.ConvertBoolStringToBoolByType("AllowDeleteRecords", DataTypes.Boolean);
try
{
if (allowDeleteRecords)
{
_logger.LogInformation("AllowDeleteRecords flag is true, delete record sent to RemoveParticipant function.");
var json = JsonSerializer.Serialize(basicParticipantCsvRecord);
await _callFunction.SendPost(_config.PMSRemoveParticipant, json);
}
else
{
await _exceptionHandler.CreateDeletedRecordException(basicParticipantCsvRecord);
_logger.LogInformation("AllowDeleteRecords flag is false, exception raised for delete record.");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Remove participant function failed.\nMessage: {Message}\nStack Trace: {StackTrace}", ex.Message, ex.StackTrace);
await CreateError(basicParticipantCsvRecord.participant, filename);
}
}
private async Task CreateError(Participant participant, string filename)
{
try
{
_logger.LogError("Cannot parse record type with action: {ParticipantRecordType}", participant.RecordType);
var errorDescription = $"a record has failed to process with the NHS Number: REDACTED because of an incorrect record type";
await _exceptionHandler.CreateRecordValidationExceptionLog(participant.NhsNumber, filename, errorDescription, "", JsonSerializer.Serialize(participant));
}
catch (Exception ex)
{
_logger.LogError(ex, "Handling the exception failed.\nMessage: {Message}\nStack Trace: {StackTrace}", ex.Message, ex.StackTrace);
await _exceptionHandler.CreateSystemExceptionLog(ex, participant, filename);
}
}
}