-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathadmin.py
More file actions
131 lines (109 loc) · 4.7 KB
/
admin.py
File metadata and controls
131 lines (109 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from typing import Optional
from uuid import uuid4
from fastapi import BackgroundTasks, Depends, HTTPException, status
from mesh_sandbox.common.exceptions import create_ndr
from ..common.messaging import Messaging
from ..dependencies import get_messaging
from ..models.mailbox import Mailbox
from ..models.message import (
Message,
MessageEvent,
MessageMetadata,
MessageParty,
MessageStatus,
MessageType,
)
from ..views.admin import (
AddMessageEventRequest,
CreateReportRequest,
MailboxDetails,
MessageDetails,
)
class AdminHandler:
def __init__(self, messaging: Messaging = Depends(get_messaging)):
self.messaging = messaging
async def reset(self, mailbox_id: Optional[str] = None):
if self.messaging.readonly:
raise HTTPException(
status_code=status.HTTP_405_METHOD_NOT_ALLOWED,
detail="reset not supported for current store mode",
)
if not mailbox_id:
await self.messaging.reset()
return
mailbox = await self.messaging.get_mailbox(mailbox_id, accessed=False)
if not mailbox:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="mailbox does not exist")
await self.messaging.reset_mailbox(mailbox.mailbox_id)
async def create_report(self, request: CreateReportRequest, background_tasks: BackgroundTasks) -> Message:
recipient = await self.messaging.get_mailbox(request.mailbox_id, accessed=False)
if not recipient:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="mailbox does not exist")
assert request.status in (MessageStatus.UNDELIVERABLE, MessageStatus.ERROR)
if request.status == MessageStatus.UNDELIVERABLE:
message = create_ndr(request, recipient)
else:
message = Message(
events=[
MessageEvent(status=MessageStatus.ACCEPTED),
MessageEvent(
status=request.status,
event="TRANSFER",
code=request.code,
description=request.description,
linked_message_id=request.linked_message_id,
),
],
message_id=uuid4().hex.upper(),
sender=MessageParty(
mailbox_id="",
mailbox_name="Central System Mailbox",
ods_code="X26",
org_code="X26",
org_name="NHS England",
billing_entity="England",
),
recipient=MessageParty(
mailbox_id=recipient.mailbox_id,
mailbox_name=recipient.mailbox_name,
ods_code=recipient.ods_code,
org_code=recipient.org_code,
org_name=recipient.org_name,
billing_entity=recipient.billing_entity,
),
total_chunks=0,
message_type=MessageType.REPORT,
workflow_id=request.workflow_id,
metadata=MessageMetadata(
subject=request.subject,
local_id=request.local_id,
file_name=request.file_name,
),
)
await self.messaging.send_message(message=message, body=b"", background_tasks=background_tasks)
return message
async def add_message_event(
self, message_id: str, new_event: AddMessageEventRequest, background_tasks: BackgroundTasks
):
message = await self.messaging.get_message(message_id)
if not message:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
event = MessageEvent(
status=new_event.status,
code=new_event.code,
event=new_event.event,
description=new_event.description,
linked_message_id=new_event.linked_message_id,
)
message = await self.messaging.add_message_event(message, event, background_tasks)
return message
async def get_mailbox_details(self, mailbox_id: str) -> MailboxDetails:
mailbox: Optional[Mailbox] = await self.messaging.get_mailbox(mailbox_id)
if not mailbox:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return MailboxDetails.from_mailbox(mailbox)
async def get_message_details(self, message_id: str) -> MessageDetails:
message: Optional[Message] = await self.messaging.get_message(message_id)
if not message:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return MessageDetails.from_message(message)