Skip to content

Commit 4f6400d

Browse files
committed
implementation
1 parent 784006e commit 4f6400d

4 files changed

Lines changed: 33 additions & 20 deletions

File tree

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
from redis_cacher import RedisCacher
22
from common.clients import logger
33
from common.s3_event import S3EventRecord
4+
from common.service_return import ServiceReturn
45
'''
56
Record Processor
67
This module processes individual S3 records from an event.
78
It is used to upload data to Redis ElastiCache.
89
'''
910

1011

11-
def process_record(record: S3EventRecord) -> dict:
12+
def process_record(record: S3EventRecord) -> ServiceReturn:
1213
try:
1314
logger.info("Processing S3 r bucket: %s, key: %s",
1415
record.get_bucket_name(), record.get_object_key())
@@ -20,17 +21,21 @@ def process_record(record: S3EventRecord) -> dict:
2021
}
2122

2223
try:
23-
result = RedisCacher.upload(bucket_name, file_key)
24-
result.update(base_log_data)
25-
return result
24+
service_result = RedisCacher.upload(bucket_name, file_key)
25+
if service_result.is_success:
26+
result = service_result.value
27+
result.update(base_log_data)
28+
return ServiceReturn(value=result)
29+
else:
30+
return ServiceReturn(
31+
status=500,
32+
message=f"Failed to upload to cache for filename '{file_key}': {service_result.message}")
2633

2734
except Exception as error: # pylint: disable=broad-except
2835
logger.exception("Error uploading to cache for filename '%s'", file_key)
2936
error_data = {"status": "error", "message": str(error)}
3037
error_data.update(base_log_data)
31-
return error_data
38+
return ServiceReturn(value=error_data)
3239

3340
except Exception: # pylint: disable=broad-except
34-
msg = "Error processing record"
35-
logger.exception(msg)
36-
return {"status": "error", "message": msg}
41+
return ServiceReturn(value={"status": "error", "message": "Error processing record"})

lambdas/redis_sync/src/redis_cacher.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class RedisCacher:
1212
"""Class to handle interactions with ElastiCache (Redis) for configuration files."""
1313

1414
@staticmethod
15-
def upload(bucket_name: str, file_key: str) -> dict:
15+
def upload(bucket_name: str, file_key: str) -> ServiceReturn:
1616
try:
1717
logger.info("Upload from s3 to Redis cache. file '%s'. bucket '%s'", file_key, bucket_name)
1818

@@ -46,8 +46,8 @@ def upload(bucket_name: str, file_key: str) -> dict:
4646
redis_client.hdel(key, *fields_to_delete)
4747
logger.info("Deleted mapping fields for %s: %s", key, fields_to_delete)
4848

49-
return {"status": "success", "message": f"File {file_key} uploaded to Redis cache."}
49+
return ServiceReturn(value={"status": "success", "message": f"File {file_key} uploaded to Redis cache."})
5050
except Exception:
5151
msg = f"Error uploading file '{file_key}' to Redis cache"
5252
logger.exception(msg)
53-
return {"status": "error", "message": msg}
53+
return ServiceReturn(status=500, message=msg)

lambdas/redis_sync/src/redis_sync.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from common.log_decorator import logging_decorator
55
from common.redis_client import get_redis_client
66
from common.s3_event import S3Event
7+
from common.service_return import ServiceReturn
78
'''
89
Event Processor
910
The Business Logic for the Redis Sync Lambda Function.
@@ -15,18 +16,20 @@ def _process_all_records(s3_records: list) -> dict:
1516
error_count = 0
1617
file_keys = []
1718
for record in s3_records:
18-
record_result = process_record(record)
19-
file_keys.append(record_result["file_key"])
20-
if record_result["status"] == "error":
19+
service_result = process_record(record)
20+
file_keys.append(service_result.value.get("file_key"))
21+
if service_result.status == 500:
2122
error_count += 1
2223
if error_count > 0:
2324
logger.error("Processed %d records with %d errors", record_count, error_count)
24-
return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors",
25-
"file_keys": file_keys}
25+
return ServiceReturn(value={"status": "error", "message": f"Processed {record_count} records with {error_count} errors",
26+
"file_keys": file_keys})
2627
else:
2728
logger.info("Successfully processed all %d records", record_count)
28-
return {"status": "success", "message": f"Successfully processed {record_count} records",
29-
"file_keys": file_keys}
29+
return ServiceReturn(
30+
value={"status": "success",
31+
"message": f"Successfully processed {record_count} records",
32+
"file_keys": file_keys})
3033

3134

3235
@logging_decorator(prefix="redis_sync", stream_name=STREAM_NAME)
@@ -44,7 +47,12 @@ def handler(event, _):
4447
logger.info(no_records)
4548
return {"status": "success", "message": no_records}
4649
else:
47-
return _process_all_records(s3_records)
50+
service_result = _process_all_records(s3_records)
51+
if service_result.is_success:
52+
return service_result.value
53+
else:
54+
return {"status": "error", "message": service_result.value.get("message"),
55+
"file_keys": service_result.value.get("file_keys", [])}
4856
else:
4957
logger.info(no_records)
5058
return {"status": "success", "message": no_records}

lambdas/shared/src/common/s3_reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class S3Reader:
1212
"""
1313

1414
@staticmethod
15-
def read(bucket_name, file_key):
15+
def read(bucket_name, file_key) -> ServiceReturn:
1616
try:
1717
s3_file = s3_client.get_object(Bucket=bucket_name, Key=file_key)
1818
return ServiceReturn(value=s3_file["Body"].read().decode("utf-8"))

0 commit comments

Comments
 (0)