-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathoutbox.py
More file actions
131 lines (104 loc) · 4.85 KB
/
outbox.py
File metadata and controls
131 lines (104 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from datetime import datetime
from typing import Optional, Union
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator # pylint: disable=no-name-in-module
from ..common import MESH_MEDIA_TYPES, exclude_none_json_encoder
from ..models.message import Message
from . import RichMessageV1
class SendMessageV1(BaseModel):
messageID: str = Field(description="message identifier of the accepted message")
class Config:
title = "send_message"
json_schema_extra = {"example": {"messageID": "20220228174323222_ABCDEF"}}
class SendMessageV2(BaseModel):
message_id: str = Field(description="message identifier of the accepted message")
class Config:
title = "send_message"
json_schema_extra = {"example": {"message_id": "20220228174323222_ABCDEF"}}
class UploadChunkV1(BaseModel):
messageID: Union[str, None] = Field(default=None, description="message identifier, as supplied in the request url") # type: ignore
blockID: Union[int, None] = Field(default=None, description="chunk number, as supplied in the request url") # type: ignore
class Config:
title = "upload_chunk"
json_schema_extra = {"example": {"messageID": "20220228174323222_ABCDEF", "blockID": 3}}
class OutboxMessageV1(RichMessageV1):
# pylint: disable=E0213
class Config:
validate_assignment = True
@validator("local_id", "message_type", "status_code", "workflow_id", "recipient_name")
def prevent_nones(cls, name):
return name or ""
class RichOutboxView(BaseModel):
valid_at: Optional[str] = Field(description="iso datetime that the result was created")
messages: list[OutboxMessageV1] = Field(description="list of found messages")
links: dict[str, str] = Field(description="map of links, e.g. links.next if more results exist")
class Config:
validate_assignment = True
title = "rich_outbox"
json_schema_extra = {
"example": {
"valid_at": "2021-11-22T14:35:52.29Z",
"messages": [
{
"message_id": "20200601122152994285_D59900",
"expiry_timestamp": "2021-11-22T14:35:52.29Z",
"local_id": "api-docs-bob-sends-alice-a-chunked-file",
"message_type": "DATA",
"recipient": "X26HC005",
"recipient_name": "TUVTSCBVSSBUZXN0aW5nIDAx",
"sender": "X26HC006",
"sender_name": "APIM bebop",
"sent_date": "2021-11-22T14:35:52.29Z",
"status": "Accepted",
"status_code": "",
"workflow_id": "API-DOCS-TEST",
}
],
"links": {
"self": "/messageexchange/mb12345/outbox?start_time=2022-05-20T14:35:52Z",
"next": (
"/messageexchange/mb12345/outbox?start_time=2022-05-20T14:35:52Z&continue_from="
"eyJwayI6ICJNQiNNU0cjTUIjMTIzNEhDMTIzNCMiLCAic2siOiAiTUIjTVNHIzIwMjIw"
"MjI4MTc0MzIzMTIzX0FDREVEMSMifQ%3D%3D"
),
},
}
}
def map_to_outbox_message(messages: list[Message]) -> list[OutboxMessageV1]:
return [
OutboxMessageV1(
message_id=msg.message_id,
expiry_timestamp=msg.inbox_expiry_timestamp,
local_id=msg.metadata.local_id,
message_type=msg.message_type,
recipient=msg.recipient.mailbox_id,
recipient_name=msg.recipient.mailbox_name,
sender=msg.sender.mailbox_id,
sender_name=msg.sender.mailbox_name,
sent_date=msg.created_timestamp,
status=msg.status,
status_code=msg.last_event.code,
workflow_id=msg.workflow_id,
total_chunks=msg.total_chunks or 0,
)
for msg in messages
]
def get_rich_outbox_view(messages: list[Message], links: dict[str, str]) -> JSONResponse:
return JSONResponse(
content=exclude_none_json_encoder(
RichOutboxView(
valid_at=datetime.utcnow().isoformat(),
messages=map_to_outbox_message(messages),
links=links,
)
),
media_type=MESH_MEDIA_TYPES[2],
)
def send_message_response(message: Message, model_version: int = 1) -> Union[SendMessageV1, SendMessageV2]:
if model_version < 2:
return SendMessageV1(messageID=message.message_id)
return SendMessageV2(message_id=message.message_id)
def upload_chunk_response(message: Message, chunk_number: int, model_version: int = 1) -> Optional[UploadChunkV1]:
if model_version < 2:
return UploadChunkV1(messageID=message.message_id, blockID=chunk_number)
return None