11"""Functions for uploading the data to the ack file"""
22
3+ import json
34import os
45import time
5- from datetime import datetime
6+ from copy import deepcopy
7+ from datetime import datetime , timezone
68from io import BytesIO , StringIO
79
810from botocore .exceptions import ClientError
911
1012from common .aws_s3_utils import move_file
11- from common .batch .audit_table import get_record_count_and_failures_by_message_id , update_audit_table_item
13+ from common .batch .audit_table import (
14+ get_ingestion_start_time_by_message_id ,
15+ get_record_count_and_failures_by_message_id ,
16+ update_audit_table_item ,
17+ )
1218from common .clients import get_s3_client , logger
1319from common .log_decorator import generate_and_send_logs
14- from common .models .batch_constants import ACK_BUCKET_NAME , SOURCE_BUCKET_NAME , AuditTableKeys , FileStatus
20+ from common .models .batch_constants import (
21+ ACK_BUCKET_NAME ,
22+ SOURCE_BUCKET_NAME ,
23+ AuditTableKeys ,
24+ FileStatus ,
25+ )
1526from constants import (
1627 ACK_HEADERS ,
1728 BATCH_FILE_ARCHIVE_DIR ,
1829 BATCH_FILE_PROCESSING_DIR ,
30+ BATCH_REPORT_TITLE ,
31+ BATCH_REPORT_VERSION ,
1932 COMPLETED_ACK_DIR ,
2033 DEFAULT_STREAM_NAME ,
2134 LAMBDA_FUNCTION_NAME_PREFIX ,
2538STREAM_NAME = os .getenv ("SPLUNK_FIREHOSE_NAME" , DEFAULT_STREAM_NAME )
2639
2740
41+ def _generated_date () -> str :
42+ return datetime .now (timezone .utc ).isoformat ()[:- 13 ] + ".000Z"
43+
44+
45+ def _make_ack_data_dict_identifier_information (
46+ supplier : str , raw_ack_filename : str , message_id : str , ingestion_start_time : int
47+ ) -> dict :
48+ return {
49+ "system" : BATCH_REPORT_TITLE ,
50+ "version" : BATCH_REPORT_VERSION ,
51+ "generatedDate" : "" , # will be filled on completion
52+ "provider" : supplier ,
53+ "filename" : raw_ack_filename ,
54+ "messageHeaderId" : message_id ,
55+ "summary" : {
56+ "ingestionTime" : {
57+ "start" : ingestion_start_time ,
58+ }
59+ },
60+ "failures" : [],
61+ }
62+
63+
64+ def _add_ack_data_dict_summary (
65+ existing_ack_data_dict : dict ,
66+ total_ack_rows_processed : int ,
67+ successful_record_count : int ,
68+ total_failures : int ,
69+ ingestion_end_time_seconds : int ,
70+ ) -> dict :
71+ ack_data_dict = deepcopy (existing_ack_data_dict )
72+ ack_data_dict ["generatedDate" ] = _generated_date ()
73+ ack_data_dict_summary_ingestion_time = {
74+ "start" : ack_data_dict ["summary" ]["ingestionTime" ]["start" ],
75+ "end" : ingestion_end_time_seconds ,
76+ }
77+ ack_data_dict_summary = {
78+ "totalRecords" : total_ack_rows_processed ,
79+ "succeeded" : successful_record_count ,
80+ "failed" : total_failures ,
81+ "ingestionTime" : ack_data_dict_summary_ingestion_time ,
82+ }
83+ ack_data_dict ["summary" ] = ack_data_dict_summary
84+ return ack_data_dict
85+
86+
87+ def _make_json_ack_data_row (ack_data_row : dict ) -> dict :
88+ return {
89+ "rowId" : int (ack_data_row ["MESSAGE_HEADER_ID" ].split ("^" )[- 1 ]),
90+ "responseCode" : ack_data_row ["RESPONSE_CODE" ],
91+ "responseDisplay" : ack_data_row ["RESPONSE_DISPLAY" ],
92+ "severity" : ack_data_row ["ISSUE_SEVERITY" ],
93+ "localId" : ack_data_row ["LOCAL_ID" ],
94+ "operationOutcome" : ack_data_row ["OPERATION_OUTCOME" ],
95+ }
96+
97+
2898def create_ack_data (
2999 created_at_formatted_string : str ,
30100 local_id : str ,
@@ -71,28 +141,46 @@ def complete_batch_file_process(
71141 the audit table status"""
72142 start_time = time .time ()
73143
144+ # finish CSV file
74145 ack_filename = f"{ file_key .replace ('.csv' , f'_BusAck_{ created_at_formatted_string } .csv' )} "
75146
76147 move_file (ACK_BUCKET_NAME , f"{ TEMP_ACK_DIR } /{ ack_filename } " , f"{ COMPLETED_ACK_DIR } /{ ack_filename } " )
77148 move_file (SOURCE_BUCKET_NAME , f"{ BATCH_FILE_PROCESSING_DIR } /{ file_key } " , f"{ BATCH_FILE_ARCHIVE_DIR } /{ file_key } " )
78149
79150 total_ack_rows_processed , total_failures = get_record_count_and_failures_by_message_id (message_id )
80- update_audit_table_item (
81- file_key = file_key , message_id = message_id , attrs_to_update = {AuditTableKeys .STATUS : FileStatus .PROCESSED }
82- )
151+ successful_record_count = total_ack_rows_processed - total_failures
83152
84153 # Consider creating time utils and using datetime instead of time
85- ingestion_end_time = time .strftime ( "%Y%m%dT%H%M%S00" , time .gmtime ())
86- successful_record_count = total_ack_rows_processed - total_failures
154+ time_now = time .gmtime ( time .time ())
155+ ingestion_end_time = time . strftime ( "%Y%m%dT%H%M%S00" , time_now )
87156 update_audit_table_item (
88157 file_key = file_key ,
89158 message_id = message_id ,
90159 attrs_to_update = {
91160 AuditTableKeys .RECORDS_SUCCEEDED : successful_record_count ,
92161 AuditTableKeys .INGESTION_END_TIME : ingestion_end_time ,
162+ AuditTableKeys .STATUS : FileStatus .PROCESSED ,
93163 },
94164 )
95165
166+ # finish JSON file
167+ json_ack_filename = f"{ file_key .replace ('.csv' , f'_BusAck_{ created_at_formatted_string } .json' )} "
168+ temp_ack_file_key = f"{ TEMP_ACK_DIR } /{ json_ack_filename } "
169+ ack_data_dict = obtain_current_json_ack_content (message_id , supplier , file_key , temp_ack_file_key )
170+
171+ ack_data_dict = _add_ack_data_dict_summary (
172+ ack_data_dict ,
173+ total_ack_rows_processed ,
174+ successful_record_count ,
175+ total_failures ,
176+ int (time .strftime ("%s" , time_now )),
177+ )
178+
179+ # Upload ack_data_dict to S3
180+ json_bytes = BytesIO (json .dumps (ack_data_dict , indent = 2 ).encode ("utf-8" ))
181+ get_s3_client ().upload_fileobj (json_bytes , ACK_BUCKET_NAME , temp_ack_file_key )
182+ move_file (ACK_BUCKET_NAME , f"{ TEMP_ACK_DIR } /{ json_ack_filename } " , f"{ COMPLETED_ACK_DIR } /{ json_ack_filename } " )
183+
96184 result = {
97185 "message_id" : message_id ,
98186 "file_key" : file_key ,
@@ -127,14 +215,14 @@ def log_batch_file_process(start_time: float, result: dict, function_name: str)
127215 generate_and_send_logs (STREAM_NAME , start_time , base_log_data , additional_log_data )
128216
129217
130- def obtain_current_ack_content (temp_ack_file_key : str ) -> StringIO :
218+ def obtain_current_csv_ack_content (temp_ack_file_key : str ) -> StringIO :
131219 """Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
132220 try :
133221 # If ack file exists in S3 download the contents
134222 existing_ack_file = get_s3_client ().get_object (Bucket = ACK_BUCKET_NAME , Key = temp_ack_file_key )
135223 existing_content = existing_ack_file ["Body" ].read ().decode ("utf-8" )
136224 except ClientError as error :
137- # If ack file does not exist in S3 create a new file containing the headers only
225+ # If ack file does not exist in S3 create a new file containing the identifier information
138226 if error .response ["Error" ]["Code" ] in ("404" , "NoSuchKey" ):
139227 logger .info ("No existing ack file found in S3 - creating new file" )
140228 existing_content = "|" .join (ACK_HEADERS ) + "\n "
@@ -147,16 +235,42 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
147235 return accumulated_csv_content
148236
149237
150- def update_ack_file (
238+ def obtain_current_json_ack_content (message_id : str , supplier : str , file_key : str , temp_ack_file_key : str ) -> dict :
239+ """Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
240+ try :
241+ # If ack file exists in S3 download the contents
242+ existing_ack_file = get_s3_client ().get_object (Bucket = ACK_BUCKET_NAME , Key = temp_ack_file_key )
243+ except ClientError as error :
244+ # If ack file does not exist in S3 create a new file containing the headers only
245+ if error .response ["Error" ]["Code" ] in ("404" , "NoSuchKey" ):
246+ logger .info ("No existing JSON ack file found in S3 - creating new file" )
247+
248+ ingestion_start_time = get_ingestion_start_time_by_message_id (message_id )
249+ raw_ack_filename = file_key .split ("." )[0 ]
250+
251+ # Generate the initial fields
252+ return _make_ack_data_dict_identifier_information (
253+ supplier ,
254+ raw_ack_filename ,
255+ message_id ,
256+ ingestion_start_time ,
257+ )
258+ else :
259+ logger .error ("error whilst obtaining current JSON ack content: %s" , error )
260+ raise
261+
262+ return json .loads (existing_ack_file ["Body" ].read ().decode ("utf-8" ))
263+
264+
265+ def update_csv_ack_file (
151266 file_key : str ,
152267 created_at_formatted_string : str ,
153268 ack_data_rows : list ,
154269) -> None :
155270 """Updates the ack file with the new data row based on the given arguments"""
156271 ack_filename = f"{ file_key .replace ('.csv' , f'_BusAck_{ created_at_formatted_string } .csv' )} "
157272 temp_ack_file_key = f"{ TEMP_ACK_DIR } /{ ack_filename } "
158- completed_ack_file_key = f"{ COMPLETED_ACK_DIR } /{ ack_filename } "
159- accumulated_csv_content = obtain_current_ack_content (temp_ack_file_key )
273+ accumulated_csv_content = obtain_current_csv_ack_content (temp_ack_file_key )
160274
161275 for row in ack_data_rows :
162276 data_row_str = [str (item ) for item in row .values ()]
@@ -166,4 +280,25 @@ def update_ack_file(
166280 csv_file_like_object = BytesIO (accumulated_csv_content .getvalue ().encode ("utf-8" ))
167281
168282 get_s3_client ().upload_fileobj (csv_file_like_object , ACK_BUCKET_NAME , temp_ack_file_key )
169- logger .info ("Ack file updated to %s: %s" , ACK_BUCKET_NAME , completed_ack_file_key )
283+ logger .info ("Ack file updated to %s: %s" , ACK_BUCKET_NAME , temp_ack_file_key )
284+
285+
286+ def update_json_ack_file (
287+ message_id : str ,
288+ supplier : str ,
289+ file_key : str ,
290+ created_at_formatted_string : str ,
291+ ack_data_rows : list ,
292+ ) -> None :
293+ """Updates the ack file with the new data row based on the given arguments"""
294+ ack_filename = f"{ file_key .replace ('.csv' , f'_BusAck_{ created_at_formatted_string } .json' )} "
295+ temp_ack_file_key = f"{ TEMP_ACK_DIR } /{ ack_filename } "
296+ ack_data_dict = obtain_current_json_ack_content (message_id , supplier , file_key , temp_ack_file_key )
297+
298+ for row in ack_data_rows :
299+ ack_data_dict ["failures" ].append (_make_json_ack_data_row (row ))
300+
301+ # Upload ack_data_dict to S3
302+ json_bytes = BytesIO (json .dumps (ack_data_dict , indent = 2 ).encode ("utf-8" ))
303+ get_s3_client ().upload_fileobj (json_bytes , ACK_BUCKET_NAME , temp_ack_file_key )
304+ logger .info ("JSON ack file updated to %s: %s" , ACK_BUCKET_NAME , temp_ack_file_key )
0 commit comments