Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions application/CohortManager/compose.data-services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,23 @@ services:
- DtOsDatabaseConnectionString=Server=db,1433;Database=${DB_NAME};User Id=SA;Password=${PASSWORD};TrustServerCertificate=True
- AcceptableLatencyThresholdMs=500

reference-data-updater:
container_name: reference-data-updater
image: cohort-manager-reference-data-updater
networks: [cohman-network]
build:
context: ./src/Functions/
dockerfile: screeningDataServices/ReferenceDataUpdater/Dockerfile
args:
BASE_IMAGE: ${FUNCTION_BASE_IMAGE}
environment:
- DtOsDatabaseConnectionString=Server=db,1433;Database=${DB_NAME};User Id=SA;Password=${PASSWORD};TrustServerCertificate=True
- ServiceBusConnectionString=${SERVICE_BUS_CONNECTION_STRING}
- ReferenceDataTopicName=reference-data-updates
- ReferenceDataSubscription=reference-data-updater-sub
- AzureWebJobsStorage=${AZURE_WEB_JOBS_STORAGE}
- SeedDataBlobContainer=seed-data

servicenow-cases-data-service:
container_name: servicenow-cases-data-service
image: cohort-manager-servicenow-cases-data-service
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Model;

using System.Text.Json;

public class ReferenceDataUpdateMessage
{
public required string DataType { get; set; }
public required JsonElement Data { get; set; }
public string? CorrelationId { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
ARG BASE_IMAGE
FROM ${BASE_IMAGE} AS function

COPY ./screeningDataServices/ReferenceDataUpdater /app/src/dotnet-function-app
WORKDIR /app/src/dotnet-function-app

RUN --mount=type=cache,target=/root/.nuget/packages \
dotnet publish ./*.csproj --output /home/site/wwwroot

# To enable ssh & remote debugging on app service change the base image to the one below
# FROM mcr.microsoft.com/azure-functions/dotnet-isolated:4-dotnet-isolated8.0-appservice
FROM mcr.microsoft.com/azure-functions/dotnet-isolated:4-dotnet-isolated8.0
ENV AzureWebJobsScriptRoot=/home/site/wwwroot \
AzureFunctionsJobHost__Logging__Console__IsEnabled=true

COPY --from=function ["/home/site/wwwroot", "/home/site/wwwroot"]

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace ReferenceDataUpdater;

using System.Text.Json;

public interface IReferenceDataInsertHandler
{
Task<bool> ProcessRecord(string dataType, JsonElement data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Common;
using DataServices.Core;
using DataServices.Database;
using HealthChecks.Extensions;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using ReferenceDataUpdater;

var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.AddDataServicesHandler<DataServicesContext>()
.ConfigureServices(services =>
{
services.AddSingleton<IBlobStorageHelper, BlobStorageHelper>();
services.AddScoped<IReferenceDataInsertHandler, ReferenceDataInsertHandler>();
services.AddDatabaseHealthCheck("ReferenceDataUpdater");
})
.AddTelemetry()
.Build();

await host.RunAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
namespace ReferenceDataUpdater;

using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;
using Common;
using DataServices.Core;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Model;

public class ReferenceDataInsertHandler : IReferenceDataInsertHandler
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNameCaseInsensitive = true,
WriteIndented = true
};

private static readonly Dictionary<string, (Type EntityType, string BlobFileName)> TypeRegistry = new(StringComparer.OrdinalIgnoreCase)
{
["BsSelectGpPractice"] = (typeof(BsSelectGpPractice), "BsSelectGpPractice.json"),
["BsSelectOutCode"] = (typeof(BsSelectOutCode), "BsSelectOutCode.json"),
["CurrentPosting"] = (typeof(CurrentPosting), "CurrentPosting.json"),
["ExcludedSMULookup"] = (typeof(ExcludedSMULookup), "ExcludedSMULookup.json"),
["LanguageCode"] = (typeof(LanguageCode), "LanguageCode.json"),
["ScreeningLkp"] = (typeof(ScreeningLkp), "ScreeningLkp.json"),
["GeneCodeLkp"] = (typeof(GeneCodeLkp), "GeneCodeLkp.json"),
["HigherRiskReferralReasonLkp"] = (typeof(HigherRiskReferralReasonLkp), "HigherRiskReferralReasonLkp.json"),
["BsoOrganisation"] = (typeof(BsoOrganisation), "BsoOrganisation.json"),
["GenderMaster"] = (typeof(GenderMaster), "GenderMaster.json"),
};

private readonly IServiceProvider _serviceProvider;
private readonly IBlobStorageHelper _blobStorageHelper;
private readonly ILogger<ReferenceDataInsertHandler> _logger;
private readonly string _storageConnectionString;
private readonly string _seedDataContainerName;
private static readonly ConcurrentDictionary<Type, Func<object, object, Task<bool>>> _insertDelegates = new();

public ReferenceDataInsertHandler(
IServiceProvider serviceProvider,
IBlobStorageHelper blobStorageHelper,
ILogger<ReferenceDataInsertHandler> logger)
{
_serviceProvider = serviceProvider;
_blobStorageHelper = blobStorageHelper;
_logger = logger;
_storageConnectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage")
?? throw new InvalidOperationException("AzureWebJobsStorage environment variable is not set.");
_seedDataContainerName = Environment.GetEnvironmentVariable("SeedDataBlobContainer") ?? "seed-data";
}

public async Task<bool> ProcessRecord(string dataType, JsonElement data)
{
if (!TypeRegistry.TryGetValue(dataType, out var registration))
{
_logger.LogError("Unknown reference data type: {DataType}", dataType);
return false;
}

var entity = JsonSerializer.Deserialize(data.GetRawText(), registration.EntityType, JsonOptions);
if (entity is null)
{
_logger.LogError("Failed to deserialise payload for type {DataType}.", dataType);
return false;
}

var dbInserted = await InsertIntoDatabase(dataType, registration.EntityType, entity);
await AppendToBlob(dataType, registration.BlobFileName, data);

return dbInserted;
}

private async Task<bool> InsertIntoDatabase(string dataType, Type entityType, object entity)
{
try
{
var accessorType = typeof(IDataServiceAccessor<>).MakeGenericType(entityType);
var accessor = _serviceProvider.GetRequiredService(accessorType);

var invoker = _insertDelegates.GetOrAdd(entityType, t =>
{
var aType = typeof(IDataServiceAccessor<>).MakeGenericType(t);
var method = aType.GetMethod("InsertSingle")!;

var accessorParam = System.Linq.Expressions.Expression.Parameter(typeof(object), "a");
var entityParam = System.Linq.Expressions.Expression.Parameter(typeof(object), "e");

var call = System.Linq.Expressions.Expression.Call(
System.Linq.Expressions.Expression.Convert(accessorParam, aType),
method,
System.Linq.Expressions.Expression.Convert(entityParam, t));

return System.Linq.Expressions.Expression.Lambda<Func<object, object, Task<bool>>>(
call, accessorParam, entityParam).Compile();
});

var result = await invoker(accessor, entity);

if (!result)
{
_logger.LogWarning("InsertSingle returned false for type {DataType}. Record may not have been inserted, possibly because it already exists.", dataType);
}

return result;
}
catch (DbUpdateException ex) when (IsPrimaryKeyViolation(ex))
{
_logger.LogWarning(ex, "Duplicate record detected for type {DataType}. Skipping insert.", dataType);
return false;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to insert record into database for type {DataType}.", dataType);
return false;
}
}

private async Task AppendToBlob(string dataType, string blobFileName, JsonElement newRecord)
{
try
{
var existingRecords = new List<JsonElement>();

var existingBlob = await _blobStorageHelper.GetFileFromBlobStorage(_storageConnectionString, _seedDataContainerName, blobFileName);
if (existingBlob?.Data != null)
{
existingBlob.Data.Position = 0;
using var reader = new StreamReader(existingBlob.Data);
var existingJson = await reader.ReadToEndAsync();
existingRecords = JsonSerializer.Deserialize<List<JsonElement>>(existingJson, JsonOptions) ?? new List<JsonElement>();
}

existingRecords.Add(newRecord);

var updatedJson = JsonSerializer.Serialize(existingRecords, JsonOptions);
var bytes = Encoding.UTF8.GetBytes(updatedJson);
var blobFile = new BlobFile(bytes, blobFileName);

await _blobStorageHelper.UploadFileToBlobStorage(_storageConnectionString, _seedDataContainerName, blobFile, overwrite: true);

_logger.LogInformation(
"Appended record to blob {BlobFileName} for type {DataType}. Total records: {Count}",
blobFileName, dataType, existingRecords.Count);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to append record to blob for type {DataType}. DB insert was successful; blob is out of sync.",
dataType);
}
}

private static bool IsPrimaryKeyViolation(DbUpdateException ex)
{
const int SqlServerPrimaryKeyViolation = 2627;
const int SqlServerUniqueConstraintViolation = 2601;

for (Exception? current = ex.InnerException; current is not null; current = current.InnerException)
{
var numberProperty = current.GetType().GetProperty("Number");
if (numberProperty?.PropertyType == typeof(int))
{
var errorNumber = (int)numberProperty.GetValue(current)!;
if (errorNumber == SqlServerPrimaryKeyViolation || errorNumber == SqlServerUniqueConstraintViolation)
{
return true;
}
}
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.ServiceBus" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../Shared/Common/Common.csproj" />
<ProjectReference Include="../../Shared/DataServices.Core/DataServices.Core.csproj" />
<ProjectReference Include="../../Shared/DataServices.Database/DataServices.Database.csproj" />
<ProjectReference Include="../../Shared/HealthChecks/HealthChecks.csproj" />
<ProjectReference Include="../../Shared/Model/Model.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
namespace ReferenceDataUpdater;

using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Model;

public class ReferenceDataUpdaterFunction
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNameCaseInsensitive = true
};

private readonly IReferenceDataInsertHandler _insertHandler;
private readonly ILogger<ReferenceDataUpdaterFunction> _logger;

public ReferenceDataUpdaterFunction(
IReferenceDataInsertHandler insertHandler,
ILogger<ReferenceDataUpdaterFunction> logger)
{
_insertHandler = insertHandler;
_logger = logger;
}

[Function(nameof(ReferenceDataUpdaterFunction))]
public async Task Run(
[ServiceBusTrigger(
topicName: "%ReferenceDataTopicName%",
subscriptionName: "%ReferenceDataSubscription%",
Connection = "ServiceBusConnectionString",
AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
ReferenceDataUpdateMessage? updateMessage;
try
{
updateMessage = JsonSerializer.Deserialize<ReferenceDataUpdateMessage>(message.Body, JsonOptions);
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialise reference data update message.");
await messageActions.DeadLetterMessageAsync(message);
return;
}

if (updateMessage is null || string.IsNullOrWhiteSpace(updateMessage.DataType))
{
_logger.LogError("Reference data update message was null or missing DataType.");
await messageActions.DeadLetterMessageAsync(message);
return;
}

_logger.LogInformation(
"Processing reference data update | Type: {DataType} | CorrelationId: {CorrelationId}",
updateMessage.DataType, updateMessage.CorrelationId);

try
{
var success = await _insertHandler.ProcessRecord(updateMessage.DataType, updateMessage.Data);

if (!success)
{
_logger.LogError(
"Failed to process reference data update for type {DataType}.",
updateMessage.DataType);
await messageActions.DeadLetterMessageAsync(message);
return;
}

await messageActions.CompleteMessageAsync(message);
_logger.LogInformation(
"Reference data update completed | Type: {DataType} | CorrelationId: {CorrelationId}",
updateMessage.DataType, updateMessage.CorrelationId);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Unexpected error processing reference data update for type {DataType}. CorrelationId: {CorrelationId}",
updateMessage.DataType, updateMessage.CorrelationId);
await messageActions.DeadLetterMessageAsync(message);
}
}
}
Loading
Loading