Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace NHS.CohortManager.DemographicServices;

using Model;

public interface IPdsProcessor
{
Task ProcessPdsNotFoundResponse(HttpResponseMessage pdsResponse, string nhsNumber);
Comment thread
SamRobinson75684 marked this conversation as resolved.
Task ProcessRecord(Participant participant);
Task<bool> UpsertDemographicRecordFromPDS(ParticipantDemographic participantDemographic);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
namespace NHS.CohortManager.DemographicServices;

using System.Collections.Concurrent;
using System.Net.Http.Json;
using Common;
using DataServices.Client;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Model;

public class PdsProcessor : IPdsProcessor
{
private readonly ILogger<PdsProcessor> _logger;

private readonly IDataServiceClient<ParticipantDemographic> _participantDemographicClient;
private readonly ICreateBasicParticipantData _createBasicParticipantData;
private readonly RetrievePDSDemographicConfig _config;
private readonly IAddBatchToQueue _addBatchToQueue;


public PdsProcessor(
ILogger<PdsProcessor> logger,
ICreateBasicParticipantData createBasicParticipantData,
IDataServiceClient<ParticipantDemographic> participantDemographicClient,
IAddBatchToQueue addBatchToQueue,
IOptions<RetrievePDSDemographicConfig> retrievePDSDemographicConfig)
{
_logger = logger;
_participantDemographicClient = participantDemographicClient;
_createBasicParticipantData = createBasicParticipantData;
_addBatchToQueue = addBatchToQueue;
_config = retrievePDSDemographicConfig.Value;
}

/// <summary>
/// processes pds error responses. Sends a record to distribute participant via service bus
/// </summary>
/// <param name="pdsResponse"></param>
/// <param name="nhsNumber"></param>
/// <returns></returns>
public async Task ProcessPdsNotFoundResponse(HttpResponseMessage pdsResponse, string nhsNumber)
{
var errorResponse = await pdsResponse!.Content.ReadFromJsonAsync<PdsErrorResponse>();
// we now create a record as an update record and send to the manage participant function. Reason for removal for date should be today and the reason for remove of ORR
if (errorResponse!.issue!.FirstOrDefault()!.details!.coding!.FirstOrDefault()!.code == PdsConstants.InvalidatedResourceCode)
{
var pdsDemographic = new PdsDemographic()
{
NhsNumber = nhsNumber,
PrimaryCareProvider = null,
ReasonForRemoval = PdsConstants.OrrRemovalReason,
RemovalEffectiveFromDate = DateTime.UtcNow.Date.ToString("yyyyMMdd")
};
var participant = new Participant(pdsDemographic);
participant.RecordType = Actions.Removed;
//sends record for an update
await ProcessRecord(participant);
return;
}
_logger.LogError("the PDS function has returned a 404 error. function now stopping processing");
}

/// <summary>
/// sends a participant record to the distribute service bus topic
/// </summary>
/// <param name="participant"></param>
/// <returns></returns>
public async Task ProcessRecord(Participant participant)
{
var updateRecord = new ConcurrentQueue<BasicParticipantCsvRecord>();
participant.RecordType = Actions.Removed;

var basicParticipantCsvRecord = new BasicParticipantCsvRecord
{
BasicParticipantData = _createBasicParticipantData.BasicParticipantData(participant),
FileName = PdsConstants.DefaultFileName,
Participant = participant
};

updateRecord.Enqueue(basicParticipantCsvRecord);

_logger.LogInformation("Sending record to the update queue.");
await _addBatchToQueue.ProcessBatch(updateRecord, _config.ParticipantManagementTopic);
}

/// <summary>
/// adds or updates a demographic record depending on if an record already exists in the database
/// </summary>
/// <param name="participantDemographic"></param>
/// <returns></returns>
public async Task<bool> UpsertDemographicRecordFromPDS(ParticipantDemographic participantDemographic)
{
ParticipantDemographic oldParticipantDemographic = await _participantDemographicClient.GetSingleByFilter(i => i.NhsNumber == participantDemographic.NhsNumber);

if (oldParticipantDemographic == null)
{
_logger.LogInformation("Participant Demographic record not found, attemping to add Participant Demographic.");
bool addSuccess = await _participantDemographicClient.Add(participantDemographic);

if (addSuccess)
{
_logger.LogInformation("Successfully added Participant Demographic.");
return true;
}

_logger.LogError("Failed to add Participant Demographic.");
return false;
}

_logger.LogInformation("Participant Demographic record found, attempting to update Participant Demographic.");
participantDemographic.ParticipantId = oldParticipantDemographic.ParticipantId;
bool updateSuccess = await _participantDemographicClient.Update(participantDemographic);

if (updateSuccess)
{
_logger.LogInformation("Successfully updated Participant Demographic.");
return true;
}

_logger.LogError("Failed to update Participant Demographic.");
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
services.AddSingleton<IHttpParserHelper, HttpParserHelper>();
services.AddSingleton<IFhirPatientDemographicMapper, FhirPatientDemographicMapper>();
services.AddSingleton<IAddBatchToQueue, AddBatchToQueue>();
services.AddSingleton<IPdsProcessor, PdsProcessor>();
services.AddSingleton<ICreateBasicParticipantData, CreateBasicParticipantData>();
// Register health checks
services.AddBasicHealthCheck("RetrievePdsDemographic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ namespace NHS.CohortManager.DemographicServices;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
using Model;
using System.Net.Http.Json;
using System.Collections.Concurrent;

public class RetrievePdsDemographic
{
Expand All @@ -22,10 +20,8 @@ public class RetrievePdsDemographic
private readonly IHttpClientFunction _httpClientFunction;
private readonly RetrievePDSDemographicConfig _config;
private readonly IFhirPatientDemographicMapper _fhirPatientDemographicMapper;
private readonly IDataServiceClient<ParticipantDemographic> _participantDemographicClient;
private readonly IBearerTokenService _bearerTokenService;
private readonly ICreateBasicParticipantData _createBasicParticipantData;
private readonly IAddBatchToQueue _addBatchToQueue;
private readonly IPdsProcessor _pdsProcessor;
private const string PdsParticipantUrlFormat = "{0}/{1}";


Expand All @@ -35,21 +31,17 @@ public RetrievePdsDemographic(
IHttpClientFunction httpClientFunction,
IFhirPatientDemographicMapper fhirPatientDemographicMapper,
IOptions<RetrievePDSDemographicConfig> retrievePDSDemographicConfig,
IDataServiceClient<ParticipantDemographic> participantDemographicClient,
ICreateBasicParticipantData createBasicParticipantData,
IAddBatchToQueue addBatchToQueue,
IBearerTokenService bearerTokenService
IBearerTokenService bearerTokenService,
IPdsProcessor pdsProcessor
)
{
_logger = logger;
_createResponse = createResponse;
_httpClientFunction = httpClientFunction;
_fhirPatientDemographicMapper = fhirPatientDemographicMapper;
_config = retrievePDSDemographicConfig.Value;
_participantDemographicClient = participantDemographicClient;
_createBasicParticipantData = createBasicParticipantData;
_bearerTokenService = bearerTokenService;
_addBatchToQueue = addBatchToQueue;
_pdsProcessor = pdsProcessor;
}

// TODO: Need to send an exception to the EXCEPTION_MANAGEMENT table whenever this function returns a non OK status.
Expand Down Expand Up @@ -82,14 +74,14 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou

if (response.StatusCode == HttpStatusCode.NotFound || pdsDemographic.ConfidentialityCode == "R")
{
await ProcessPdsNotFoundResponse(response, nhsNumber);
await _pdsProcessor.ProcessPdsNotFoundResponse(response, nhsNumber);
return _createResponse.CreateHttpResponse(HttpStatusCode.NotFound, req, "PDS returned a 404 please database for details");
}

response.EnsureSuccessStatusCode();

var participantDemographic = pdsDemographic.ToParticipantDemographic();
var upsertResult = await UpsertDemographicRecordFromPDS(participantDemographic);
var upsertResult = await _pdsProcessor.UpsertDemographicRecordFromPDS(participantDemographic);

return upsertResult ?
_createResponse.CreateHttpResponse(HttpStatusCode.OK, req, JsonSerializer.Serialize(participantDemographic)) :
Expand All @@ -101,79 +93,4 @@ public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymou
return _createResponse.CreateHttpResponse(HttpStatusCode.InternalServerError, req);
}
}

private async Task ProcessPdsNotFoundResponse(HttpResponseMessage pdsResponse, string nhsNumber)
{
var errorResponse = await pdsResponse!.Content.ReadFromJsonAsync<PdsErrorResponse>();
// we now create a record as an update record and send to the manage participant function. Reason for removal for date should be today and the reason for remove of ORR
if (errorResponse!.issue!.FirstOrDefault()!.details!.coding!.FirstOrDefault()!.code == PdsConstants.InvalidatedResourceCode)
{
var pdsDemographic = new PdsDemographic()
{
NhsNumber = nhsNumber,
PrimaryCareProvider = null,
ReasonForRemoval = PdsConstants.OrrRemovalReason,
RemovalEffectiveFromDate = DateTime.UtcNow.Date.ToString("yyyyMMdd")
};
var participant = new Participant(pdsDemographic);
participant.RecordType = Actions.Removed;
//sends record for an update
await ProcessRecord(participant);
return;
}
_logger.LogError("the PDS function has returned a 404 error. function now stopping processing");
}


private async Task ProcessRecord(Participant participant)
{
var updateRecord = new ConcurrentQueue<BasicParticipantCsvRecord>();
participant.RecordType = participant.RecordType = Actions.Removed;

var basicParticipantCsvRecord = new BasicParticipantCsvRecord
{
BasicParticipantData = _createBasicParticipantData.BasicParticipantData(participant),
FileName = PdsConstants.DefaultFileName,
Participant = participant
};

updateRecord.Enqueue(basicParticipantCsvRecord);

_logger.LogInformation("Sending record to the update queue.");
await _addBatchToQueue.ProcessBatch(updateRecord, _config.ParticipantManagementTopic);
}


private async Task<bool> UpsertDemographicRecordFromPDS(ParticipantDemographic participantDemographic)
{
ParticipantDemographic oldParticipantDemographic = await _participantDemographicClient.GetSingleByFilter(i => i.NhsNumber == participantDemographic.NhsNumber);

if (oldParticipantDemographic == null)
{
_logger.LogInformation("Participant Demographic record not found, attemping to add Participant Demographic.");
bool addSuccess = await _participantDemographicClient.Add(participantDemographic);

if (addSuccess)
{
_logger.LogInformation("Successfully added Participant Demographic.");
return true;
}

_logger.LogError("Failed to add Participant Demographic.");
return false;
}

_logger.LogInformation("Participant Demographic record found, attempting to update Participant Demographic.");
participantDemographic.ParticipantId = oldParticipantDemographic.ParticipantId;
bool updateSuccess = await _participantDemographicClient.Update(participantDemographic);

if (updateSuccess)
{
_logger.LogInformation("Successfully updated Participant Demographic.");
return true;
}

_logger.LogError("Failed to update Participant Demographic.");
return false;
}
}
17 changes: 17 additions & 0 deletions application/CohortManager/src/Functions/Functions.sln
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JWTTokenServiceTests", "..\
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AuthClientCredentialsTests", "..\..\..\..\tests\UnitTests\AuthClientCredentialsTests\AuthClientCredentialsTests.csproj", "{59CBDBE5-29BE-F38C-80E6-40843F2F8AF6}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PdsProcesserTests", "PdsProcesserTests", "{5555D2A1-8C8F-5B64-9F84-08EFE0FC7CD8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PdsProcessorTests", "..\..\..\..\tests\PdsProcessorTests\PdsProcessorTests.csproj", "{FC22C311-57DD-B069-4041-AD2AC8F80B5D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1367,6 +1371,18 @@ Global
{59CBDBE5-29BE-F38C-80E6-40843F2F8AF6}.Release|x64.Build.0 = Release|Any CPU
{59CBDBE5-29BE-F38C-80E6-40843F2F8AF6}.Release|x86.ActiveCfg = Release|Any CPU
{59CBDBE5-29BE-F38C-80E6-40843F2F8AF6}.Release|x86.Build.0 = Release|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Debug|x64.ActiveCfg = Debug|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Debug|x64.Build.0 = Debug|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Debug|x86.ActiveCfg = Debug|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Debug|x86.Build.0 = Debug|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Release|Any CPU.Build.0 = Release|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Release|x64.ActiveCfg = Release|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Release|x64.Build.0 = Release|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Release|x86.ActiveCfg = Release|Any CPU
{FC22C311-57DD-B069-4041-AD2AC8F80B5D}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1428,5 +1444,6 @@ Global
{52C72C2E-9A76-4ECF-A210-C3F7C584C193} = {19500E0D-AAAB-6F02-E24F-82619ACA2290}
{4BD680A2-1ACB-7D6B-B2FD-8EBE9AEB5050} = {E8E33C5F-F9FB-3ACA-2B58-298ED48517C1}
{59CBDBE5-29BE-F38C-80E6-40843F2F8AF6} = {E8E33C5F-F9FB-3ACA-2B58-298ED48517C1}
{FC22C311-57DD-B069-4041-AD2AC8F80B5D} = {5555D2A1-8C8F-5B64-9F84-08EFE0FC7CD8}
EndGlobalSection
EndGlobal
Loading
Loading