This repository was archived by the owner on Jul 28, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathFileExtractFunction.cs
More file actions
118 lines (101 loc) · 4.46 KB
/
FileExtractFunction.cs
File metadata and controls
118 lines (101 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
using Google.Protobuf.WellKnownTypes;
using Microsoft.Azure.Functions.Worker;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using NHS.MESH.Client.Contracts.Services;
using ServiceLayer.Data;
using ServiceLayer.Data.Models;
using ServiceLayer.Mesh.Extensions;
using ServiceLayer.Mesh.Messaging;
using ServiceLayer.Mesh.Storage;
namespace ServiceLayer.Mesh.Functions;
public class FileExtractFunction(
ILogger<FileExtractFunction> logger,
IMeshInboxService meshInboxService,
ServiceLayerDbContext serviceLayerDbContext,
IFileTransformQueueClient fileTransformQueueClient,
IFileExtractQueueClient fileExtractQueueClient,
IMeshFilesBlobStore meshFileBlobStore)
{
[Function("FileExtractFunction")]
public async Task Run([QueueTrigger("%FileExtractQueueName%")] FileExtractQueueMessage message)
{
logger.LogInformation("{FunctionName} started. Processing fileId: {FileId}", nameof(FileExtractFunction), message.FileId);
await using var transaction = await serviceLayerDbContext.Database.BeginTransactionAsync();
var file = await GetFileAsync(message.FileId);
if (file == null || !IsFileSuitableForExtraction(file))
{
return;
}
await UpdateFileStatusForExtraction(file);
await transaction.CommitAsync();
try
{
await ProcessFileExtraction(file);
}
catch (Exception ex)
{
await HandleExtractionError(file, message, ex);
}
}
private async Task<MeshFile?> GetFileAsync(string fileId)
{
var file = await serviceLayerDbContext.MeshFiles
.FirstOrDefaultAsync(f => f.FileId == fileId);
if (file == null)
{
logger.LogWarning("File with id: {FileId} not found in MeshFiles table.", fileId);
}
return file;
}
private bool IsFileSuitableForExtraction(MeshFile file)
{
// We only want to extract files if they are in a Discovered state,
// or are in an Extracting state and were last touched over 12 hours ago.
var expectedStatuses = new[] { MeshFileStatus.Discovered, MeshFileStatus.Extracting };
if (!expectedStatuses.Contains(file.Status) ||
(file.Status == MeshFileStatus.Extracting && file.LastUpdatedUtc > DateTime.UtcNow.AddHours(-12)))
{
logger.LogWarning(
"File with id: {FileId} found in MeshFiles table but is not suitable for extraction. Status: {Status}, LastUpdatedUtc: {LastUpdatedUtc}.",
file.FileId,
file.Status,
file.LastUpdatedUtc.ToTimestamp());
return false;
}
return true;
}
private async Task UpdateFileStatusForExtraction(MeshFile file)
{
file.Status = MeshFileStatus.Extracting;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
}
private async Task ProcessFileExtraction(MeshFile file)
{
var meshResponse = await meshInboxService.GetMessageByIdAsync(file.MailboxId, file.FileId);
if (!meshResponse.IsSuccessful)
{
throw new InvalidOperationException($"Mesh extraction failed: [ {meshResponse.Error.ToFormattedString()} ]");
}
var blobPath = await meshFileBlobStore.UploadAsync(file, meshResponse.Response.FileAttachment.Content);
var meshAcknowledgementResponse = await meshInboxService.AcknowledgeMessageByIdAsync(file.MailboxId, file.FileId);
if (!meshAcknowledgementResponse.IsSuccessful)
{
logger.LogWarning("Mesh acknowledgement failed: [ {ToFormattedString} ].\nThis is not a fatal error so processing will continue.", meshAcknowledgementResponse.Error.ToFormattedString());
}
file.BlobPath = blobPath;
file.Status = MeshFileStatus.Extracted;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await fileTransformQueueClient.EnqueueFileTransformAsync(file);
}
private async Task HandleExtractionError(MeshFile file, FileExtractQueueMessage message, Exception ex)
{
logger.LogError(ex, "An exception occurred during file extraction for fileId: {FileId}", message.FileId);
file.Status = MeshFileStatus.FailedExtract;
file.LastUpdatedUtc = DateTime.UtcNow;
await serviceLayerDbContext.SaveChangesAsync();
await fileExtractQueueClient.SendToPoisonQueueAsync(message);
}
}