-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathRetrieveMeshFile.cs
More file actions
185 lines (161 loc) · 7.22 KB
/
RetrieveMeshFile.cs
File metadata and controls
185 lines (161 loc) · 7.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
namespace NHS.Screening.RetrieveMeshFile;
using Common;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Model;
using NHS.MESH.Client.Models;
using System;
using System.Globalization;
using System.Text.Json;
using System.Threading.Tasks;
public class RetrieveMeshFile
{
private readonly ILogger<RetrieveMeshFile> _logger;
private readonly IMeshToBlobTransferHandler _meshToBlobTransferHandler;
private readonly string _mailboxId;
private readonly string? _blobConnectionString;
private readonly Uri? _blobServiceUri;
private readonly IBlobStorageHelper _blobStorageHelper;
private readonly RetrieveMeshFileConfig _config;
private const string NextHandShakeTimeConfigKey = "NextHandShakeTime";
private const string ConfigFileName = "MeshState.json";
private const string ConfigContainerName = "config";
public RetrieveMeshFile(ILogger<RetrieveMeshFile> logger, IMeshToBlobTransferHandler meshToBlobTransferHandler, IBlobStorageHelper blobStorageHelper, IOptions<RetrieveMeshFileConfig> options)
{
_logger = logger;
_meshToBlobTransferHandler = meshToBlobTransferHandler;
_blobStorageHelper = blobStorageHelper;
_mailboxId = options.Value.BSSMailBox;
_config = options.Value;
if (_config.nemsmeshfolder_STORAGE != null)
{
_blobServiceUri = new Uri(_config.nemsmeshfolder_STORAGE.BlobServiceUri);
}
else
{
_blobConnectionString = Environment.GetEnvironmentVariable("nemsmeshfolder_STORAGE");
}
}
/// <summary>
/// This function polls the MESH Mailbox every 5 minutes, if there is a file posted to the mailbox.
/// If there is a file in there will move the file to the Cohort Manager Blob Storage where it will be picked up by the ReceiveCaasFile Function.
/// </summary>
[Function("RetrieveMeshFile")]
public async Task RunAsync([TimerTrigger("0 */5 * * * *")] TimerInfo myTimer)
{
_logger.LogInformation("C# Timer trigger function executed at: ,{DateTime}", DateTime.UtcNow);
static bool messageFilter(MessageMetaData i) => true; // No current filter defined there might be business rules here
static string fileNameFunction(MessageMetaData i) => string.Concat(i.MessageId, "_-_", i.WorkflowID, ".parquet");
try
{
var shouldExecuteHandShake = await ShouldExecuteHandShake();
var result = await _meshToBlobTransferHandler.MoveFilesFromMeshToBlob(messageFilter, fileNameFunction, _mailboxId, _blobServiceUri, _blobConnectionString, "inbound", shouldExecuteHandShake);
if (!result)
{
_logger.LogError("An error was encountered while moving files from Mesh to Blob");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "An error encountered while moving files from Mesh to Blob");
}
if (myTimer.ScheduleStatus is not null)
{
_logger.LogInformation("Next timer schedule at: {ScheduleStatus}", myTimer.ScheduleStatus.Next);
}
}
private async Task<bool> ShouldExecuteHandShake()
{
Dictionary<string, string> configValues;
TimeSpan handShakeInterval = new TimeSpan(0, 23, 54, 0);
BlobFile? meshState = null;
if (_blobServiceUri != null)
{
meshState = await _blobStorageHelper.GetFileFromBlobStorage(_blobServiceUri, ConfigContainerName, ConfigFileName);
}
else if (_blobConnectionString != null)
{
meshState = await _blobStorageHelper.GetFileFromBlobStorage(_blobConnectionString, ConfigContainerName, ConfigFileName);
}
if (meshState == null)
{
_logger.LogInformation("MeshState File did not exist, Creating new MeshState File in blob Storage");
configValues = new Dictionary<string, string>
{
{ NextHandShakeTimeConfigKey, DateTime.UtcNow.Add(handShakeInterval).ToString() }
};
await SetConfigState(configValues);
return true;
}
using (StreamReader reader = new StreamReader(meshState.Data))
{
meshState.Data.Seek(0, SeekOrigin.Begin);
string jsonData = await reader.ReadToEndAsync();
configValues = JsonSerializer.Deserialize<Dictionary<string, string>>(jsonData);
}
string nextHandShakeDateString;
//config value doenst exist
if (!configValues.TryGetValue(NextHandShakeTimeConfigKey, out nextHandShakeDateString))
{
_logger.LogInformation("NextHandShakeTime config item does not exist, creating new config item");
configValues.Add(NextHandShakeTimeConfigKey, DateTime.UtcNow.Add(handShakeInterval).ToString());
await SetConfigState(configValues);
return true;
}
DateTime nextHandShakeDateTime;
//date cannot be parsed
if (!DateTime.TryParse(nextHandShakeDateString, CultureInfo.InvariantCulture, out nextHandShakeDateTime))
{
_logger.LogInformation("Unable to Parse NextHandShakeTime, Updating config value");
configValues[NextHandShakeTimeConfigKey] = DateTime.UtcNow.Add(handShakeInterval).ToString();
SetConfigState(configValues);
return true;
}
if (DateTime.Compare(nextHandShakeDateTime, DateTime.UtcNow) <= 0)
{
_logger.LogInformation("Next HandShakeTime was in the past, will execute handshake");
var NextHandShakeTimeConfig = DateTime.UtcNow.Add(handShakeInterval).ToString();
configValues[NextHandShakeTimeConfigKey] = NextHandShakeTimeConfig;
_logger.LogInformation("Next Handshake scheduled for {NextHandShakeTimeConfig}", NextHandShakeTimeConfig);
return true;
}
_logger.LogInformation("Next handshake scheduled for {NextHandShakeDateTime}", nextHandShakeDateTime);
return false;
}
private async Task<bool> SetConfigState(Dictionary<string, string> state)
{
try
{
string jsonString = JsonSerializer.Serialize(state);
using (var stream = GenerateStreamFromString(jsonString))
{
var blobFile = new BlobFile(stream, ConfigFileName);
var result = false;
if (_blobServiceUri != null)
{
result = await _blobStorageHelper.UploadFileToBlobStorage(_blobServiceUri, ConfigContainerName, blobFile, true);
}
else if (_blobConnectionString != null)
{
result = await _blobStorageHelper.UploadFileToBlobStorage(_blobConnectionString, ConfigContainerName, blobFile, true);
}
return result;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Unable To set Config State");
return false;
}
}
public static Stream GenerateStreamFromString(string s)
{
var stream = new MemoryStream();
var writer = new StreamWriter(stream);
writer.Write(s);
writer.Flush();
stream.Position = 0;
return stream;
}
}