-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathconverter.py
More file actions
178 lines (142 loc) · 5.9 KB
/
converter.py
File metadata and controls
178 lines (142 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import logging
import os
from typing import BinaryIO
import boto3
from smart_open import open
EXPECTED_BUCKET_OWNER_ACCOUNT = os.getenv("ACCOUNT_ID")
DESTINATION_BUCKET_NAME = os.getenv("DESTINATION_BUCKET_NAME")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
s3_client = boto3.client('s3')
def parse_headers(headers_str: str) -> dict[str, str]:
headers = dict(
header_str.split(":", 1)
for header_str in headers_str.split("\r\n")
if ":" in header_str
)
return {k.strip(): v.strip() for k, v in headers.items()}
def parse_header_value(header_value: str) -> tuple[str, dict[str, str]]:
main_value, *params = header_value.split(";")
parsed_params = dict(
param.strip().split("=", 1)
for param in params
)
parsed_params = {k: v.strip('"') for k, v in parsed_params.items()}
return main_value, parsed_params
def read_until_part_start(input_file: BinaryIO, boundary: bytes) -> None:
while line := input_file.readline():
if line == b"--" + boundary + b"\r\n":
return
raise ValueError("Unexpected EOF")
def read_headers_bytes(input_file: BinaryIO) -> bytes:
headers_bytes = b''
while line := input_file.readline():
if line == b"\r\n":
return headers_bytes
headers_bytes += line
raise ValueError("Unexpected EOF")
def read_part_headers(input_file: BinaryIO) -> dict[str, str]:
headers_bytes = read_headers_bytes(input_file)
headers_str = headers_bytes.decode("utf-8")
return parse_headers(headers_str)
def stream_part_body(input_file: BinaryIO, boundary: bytes, output_file: BinaryIO) -> None:
previous_line = None
found_part_end = False
while line := input_file.readline():
if line == b"--" + boundary + b"\r\n":
logger.warning("Found additional part which will not be processed")
found_part_end = True
if line.startswith(b"--" + boundary + b"--"):
found_part_end = True
if previous_line is not None:
if found_part_end:
# The final \r\n is part of the encapsulation boundary, so should not be included
output_file.write(previous_line.rstrip(b'\r\n'))
return
else:
output_file.write(previous_line)
previous_line = line
raise ValueError("Unexpected EOF")
def move_file(source_bucket: str, source_key: str, destination_bucket: str, destination_key: str) -> None:
s3_client.copy_object(
CopySource={"Bucket": source_bucket, "Key": source_key},
Bucket=destination_bucket,
Key=destination_key,
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
ExpectedSourceBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT
)
s3_client.delete_object(
Bucket=source_bucket,
Key=source_key,
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT
)
def transfer_multipart_content(bucket_name: str, file_key: str, boundary: bytes, filename: str) -> None:
with open(
f"s3://{bucket_name}/{file_key}",
"rb",
transport_params={"client": s3_client}
) as input_file:
read_until_part_start(input_file, boundary)
headers = read_part_headers(input_file)
content_disposition = headers.get("Content-Disposition")
if content_disposition:
_, content_disposition_params = parse_header_value(content_disposition)
filename = content_disposition_params.get("filename") or filename
content_type = headers.get("Content-Type") or "application/octet-stream"
with open(
f"s3://{DESTINATION_BUCKET_NAME}/streaming/{filename}",
"wb",
transport_params={
"client": s3_client,
"client_kwargs": {
"S3.Client.create_multipart_upload": {
"ContentType": content_type
}
}
}
) as output_file:
stream_part_body(input_file, boundary, output_file)
move_file(DESTINATION_BUCKET_NAME, f"streaming/{filename}", DESTINATION_BUCKET_NAME, filename)
def process_record(record: dict) -> None:
bucket_name = record["s3"]["bucket"]["name"]
file_key = record["s3"]["object"]["key"]
logger.info(f"Processing {file_key}")
response = s3_client.head_object(
Bucket=bucket_name,
Key=file_key,
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT
)
content_type = response['ContentType']
media_type, content_type_params = parse_header_value(content_type)
filename = response["Metadata"].get("mex-filename") or file_key
# Handle multipart content by parsing the filename from headers and streaming the content from the first part
if media_type.startswith("multipart/"):
logger.info("Found multipart content")
boundary = content_type_params["boundary"].encode("utf-8")
transfer_multipart_content(bucket_name, file_key, boundary, filename)
else:
s3_client.copy_object(
Bucket=DESTINATION_BUCKET_NAME,
CopySource={"Bucket": bucket_name, "Key": file_key},
Key=filename,
ExpectedBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT,
ExpectedSourceBucketOwner=EXPECTED_BUCKET_OWNER_ACCOUNT
)
logger.info(f"Transfer complete for {file_key}")
move_file(bucket_name, file_key, bucket_name, f"archive/{file_key}")
logger.info(f"Archived {file_key}")
def lambda_handler(event: dict, _context: dict) -> dict:
success = True
for record in event["Records"]:
try:
process_record(record)
except Exception:
logger.exception("Failed to process record")
success = False
return {
'statusCode': 200,
'body': 'Files converted and uploaded successfully!'
} if success else {
'statusCode': 500,
'body': 'Errors occurred during processing'
}