Skip to content

Commit 95594bc

Browse files
committed
decorator
1 parent 56dcca9 commit 95594bc

14 files changed

Lines changed: 239 additions & 283 deletions

recordprocessor/src/clients.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
"""Initialise s3 and kinesis clients"""
22

33
import logging
4+
import os
45
from boto3 import client as boto3_client, resource as boto3_resource
56
from botocore.config import Config
7+
from redis_cacher import RedisCacher
68

7-
REGION_NAME = "eu-west-2"
9+
REGION_NAME = os.getenv("AWS_REGION")
810

911
s3_client = boto3_client("s3", region_name=REGION_NAME)
1012
kinesis_client = boto3_client(
@@ -21,3 +23,7 @@
2123
logging.basicConfig(level="INFO")
2224
logger = logging.getLogger()
2325
logger.setLevel("INFO")
26+
27+
REDIS_HOST = os.getenv("REDIS_HOST")
28+
REDIS_PORT = os.getenv("REDIS_PORT")
29+
redis_cacher = RedisCacher(REDIS_HOST, REDIS_PORT, logger)

recordprocessor/src/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,8 @@ class Urls:
8484
NHS_NUMBER = "https://fhir.nhs.uk/Id/nhs-number"
8585
NULL_FLAVOUR_CODES = "http://terminology.hl7.org/CodeSystem/v3-NullFlavor"
8686
VACCINATION_PROCEDURE = "https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-VaccinationProcedure"
87+
88+
89+
class RedisCacheKeys:
90+
PERMISSIONS_CONFIG_FILE_KEY = "permissions_config.json"
91+
DISEASE_MAPPING_FILE_KEY = "disease_mapping.json"

recordprocessor/src/mappings.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
from enum import Enum
44
from typing import Dict, List
5-
from constants import Urls
5+
from constants import Urls, RedisCacheKeys
6+
from redis_cacher import redis_cacher
67

78

89
class Vaccine(Enum):
@@ -70,3 +71,6 @@ def map_target_disease(vaccine: Vaccine) -> list:
7071
}
7172
for disease in diseases
7273
]
74+
75+
76+
VACCINE_DISEASE_MAPPING = redis_cacher.get(RedisCacheKeys.DISEASE_MAPPING_FILE_KEY)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import json
2+
import redis
3+
4+
5+
class RedisCacher():
6+
""" RedisCacher abstraction class to decouple application code
7+
from direct use of Redis client.
8+
Also centralised error handling & extensibility.
9+
"""
10+
11+
def __init__(self, redis_host, redis_port, logger):
12+
try:
13+
# Attempt to connect to Redis
14+
self.redis_client = redis.StrictRedis(redis_host, redis_port, decode_responses=True)
15+
# Check the connection with a PING command
16+
if self.redis_client.ping():
17+
logger.info("Successfully connected to Redis.")
18+
else:
19+
logger.error("Failed to connect to Redis.")
20+
except Exception as e:
21+
logger.exception(f"Connection to Redis failed: {e}")
22+
23+
def get(self, key: str) -> dict:
24+
"""Gets the value from Redis cache for the given key."""
25+
value = self.redis_client.get(key)
26+
if value is not None:
27+
return json.loads(value)
28+
return {}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import unittest
2+
import json
3+
from unittest.mock import patch
4+
from redis_cacher import RedisCacher
5+
from constants import RedisCacheKeys
6+
7+
8+
class TestRedisCacher(unittest.TestCase):
9+
10+
def setUp(self):
11+
# mock s3_reader and transform_map
12+
self.s3_reader_patcher = patch("redis_cacher.S3Reader")
13+
self.mock_s3_reader = self.s3_reader_patcher.start()
14+
self.transform_map_patcher = patch("redis_cacher.transform_map")
15+
self.mock_transform_map = self.transform_map_patcher.start()
16+
self.redis_client_patcher = patch("redis_cacher.redis_client")
17+
self.mock_redis_client = self.redis_client_patcher.start()
18+
19+
def tearDown(self):
20+
self.s3_reader_patcher.stop()
21+
self.transform_map_patcher.stop()
22+
self.redis_client_patcher.stop()
23+
24+
def test_upload(self):
25+
mock_data = '{"a": "b"}'
26+
mock_transformed_data = '{"b": "c"}'
27+
self.mock_s3_reader.read = unittest.mock.Mock()
28+
self.mock_s3_reader.read.return_value = mock_data
29+
self.mock_transform_map.return_value = mock_transformed_data
30+
31+
bucket_name = "bucket"
32+
file_key = "file-key"
33+
result = RedisCacher.upload(bucket_name, file_key)
34+
35+
self.mock_s3_reader.read.assert_called_once_with(bucket_name, file_key)
36+
self.mock_transform_map.assert_called_once_with(mock_data, file_key)
37+
self.mock_redis_client.set.assert_called_once_with(file_key, mock_transformed_data)
38+
self.assertTrue(result)
39+
40+
def test_get_cached_config_json(self):
41+
"""Test getting cached config JSON from Redis."""
42+
cache_key = "some-key"
43+
cached_data = '{"some-data-key": "some-data-value"}'
44+
self.mock_redis_client.get.return_value = cached_data
45+
result = RedisCacher.get_cached_config_json(cache_key)
46+
self.assertEqual(result, json.loads(cached_data))
47+
self.mock_redis_client.get.assert_called_once_with(cache_key)
48+
49+
def test_get_cached_permissions_config_json(self):
50+
cached_permissions = '{"permissions": "perm_config"}'
51+
self.mock_redis_client.get.return_value = cached_permissions
52+
result = RedisCacher.get_cached_permissions_config_json()
53+
self.assertEqual(result, json.loads(cached_permissions))
54+
self.mock_redis_client.get.assert_called_once_with(RedisCacheKeys.PERMISSIONS_CONFIG_FILE_KEY)
55+
56+
def test_get_cached_disease_mapping_json(self):
57+
cached_disease_mapping = '{"disease": "disease-mapping"}'
58+
self.mock_redis_client.get.return_value = cached_disease_mapping
59+
result = RedisCacher.get_cached_disease_mapping_json()
60+
self.assertEqual(result, json.loads(cached_disease_mapping))
61+
self.mock_redis_client.get.assert_called_once_with(RedisCacheKeys.DISEASE_MAPPING_FILE_KEY)

redis_sync/src/clients.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@
1313
REDIS_PORT = os.getenv("REDIS_PORT", 6379)
1414

1515
s3_client = boto3_client("s3", region_name=REGION_NAME)
16+
firehose_client = boto3_client("firehose", region_name=REGION_NAME)
1617
logger.info(f"Connecting to Redis at {REDIS_HOST}:{REDIS_PORT}")
1718
redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)

redis_sync/src/event_processor.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,21 @@
1010
def event_processor(event, context):
1111

1212
try:
13-
1413
logger.info("Processing S3 event with %d records", len(event.get('Records', [])))
1514
s3Event = S3Event(event)
15+
record_count = len(s3Event.get_s3_records())
16+
error_count = 0
1617
for record in s3Event.get_s3_records():
17-
record_processor(record)
18-
return True
18+
record_result = record_processor(record)
19+
if record_result["status"] == "error":
20+
error_count += 1
21+
if error_count > 0:
22+
logger.error("Processed %d records with %d errors", record_count, error_count)
23+
return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors"}
24+
else:
25+
logger.info("Successfully processed all %d records", record_count)
26+
return {"status": "success", "message": f"Successfully processed {record_count} records"}
1927

2028
except Exception:
2129
logger.exception("Error processing S3 event")
22-
return False
30+
return {"status": "error", "message": "Error processing S3 event"}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""This module contains the logging decorator for sending the appropriate logs to Cloudwatch and Firehose."""
2+
3+
import json
4+
import os
5+
import time
6+
from datetime import datetime
7+
from functools import wraps
8+
from clients import firehose_client, logger
9+
10+
STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", "immunisation-fhir-api-internal-dev-splunk-firehose")
11+
12+
13+
def send_log_to_firehose(log_data: dict) -> None:
14+
"""Sends the log_message to Firehose"""
15+
try:
16+
record = {"Data": json.dumps({"event": log_data}).encode("utf-8")}
17+
response = firehose_client.put_record(DeliveryStreamName=STREAM_NAME, Record=record)
18+
logger.info("Log sent to Firehose: %s", response) # TODO: Should we be logging full response?
19+
except Exception as error: # pylint:disable = broad-exception-caught
20+
logger.exception("Error sending log to Firehose: %s", error)
21+
22+
23+
def generate_and_send_logs(
24+
start_time, base_log_data: dict, additional_log_data: dict, is_error_log: bool = False
25+
) -> None:
26+
"""Generates log data which includes the base_log_data, additional_log_data, and time taken (calculated using the
27+
current time and given start_time) and sends them to Cloudwatch and Firehose."""
28+
log_data = {**base_log_data, "time_taken": f"{round(time.time() - start_time, 5)}s", **additional_log_data}
29+
log_function = logger.error if is_error_log else logger.info
30+
log_function(json.dumps(log_data))
31+
send_log_to_firehose(log_data)
32+
33+
34+
def logging_decorator(func):
35+
"""
36+
Sends the appropriate logs to Cloudwatch and Firehose based on the function result.
37+
NOTE: The function must return a dictionary as its only return value. The dictionary is expected to contain
38+
all of the required additional details for logging.
39+
NOTE: Logs will include the result of the function call or, in the case of an Exception being raised,
40+
a status code of 500 and the error message.
41+
"""
42+
43+
@wraps(func)
44+
def wrapper(*args, **kwargs):
45+
base_log_data = {"function_name": f"filename_processor_{func.__name__}", "date_time": str(datetime.now())}
46+
start_time = time.time()
47+
48+
try:
49+
result = func(*args, **kwargs)
50+
generate_and_send_logs(start_time, base_log_data, additional_log_data=result)
51+
return result
52+
53+
except Exception as e:
54+
additional_log_data = {"statusCode": 500, "error": str(e)}
55+
generate_and_send_logs(start_time, base_log_data, additional_log_data, is_error_log=True)
56+
raise
57+
58+
return wrapper

redis_sync/src/record_processor.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
from clients import logger
22
from s3_event import S3EventRecord
33
from redis_cacher import RedisCacher
4+
from logging_decorator import logging_decorator
45
'''
56
Record Processor
67
This module processes individual S3 records from an event.
78
It is used to upload data to Redis ElastiCache.
89
'''
910

1011

12+
# NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because
13+
# the logging_decorator is for an individual record, whereas the lambda_handler could potentially be handling
14+
# multiple records.
15+
@logging_decorator
1116
def record_processor(record: S3EventRecord) -> bool:
1217

1318
try:
@@ -21,9 +26,9 @@ def record_processor(record: S3EventRecord) -> bool:
2126
return RedisCacher.upload(bucket_name, file_key)
2227

2328
except Exception as error: # pylint: disable=broad-except
24-
logger.error("Error uploading to cache for file '%s': %s", file_key, error)
25-
return False
29+
logger.exception("Error uploading to cache for file '%s'", file_key)
30+
return {"status": "error", "message": str(error)}
2631

2732
except Exception: # pylint: disable=broad-except
2833
logger.exception("Error obtaining file_key")
29-
return False
34+
return {"status": "error", "message": "Error obtaining file_key"}

redis_sync/src/redis_cacher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def upload(bucket_name: str, file_key: str) -> None:
2020

2121
# Use the file_key as the Redis key and file content as the value
2222
redis_client.set(file_key, trx_data)
23-
return True
23+
return {"status": "success", "message": f"File {file_key} uploaded to Redis cache."}
2424

2525
@staticmethod
2626
def get_cached_config_json(file_type) -> dict:

0 commit comments

Comments
 (0)