-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclient.py
More file actions
245 lines (198 loc) · 7.32 KB
/
client.py
File metadata and controls
245 lines (198 loc) · 7.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
"""
SDS (Spine Directory Service) FHIR R4 device and endpoint lookup client.
This module provides a client for querying the Spine Directory Service to retrieve:
- Device records (including ASID - Accredited System ID)
- Endpoint records (including endpoint URLs for routing)
"""
from __future__ import annotations
import logging
import os
from enum import StrEnum
from typing import Any
from fhir import Resource
from fhir.constants import FHIRSystem
from fhir.r4 import Bundle, Device, Endpoint
from requests import HTTPError, Response
from requests import get as external_sds_get
from stubs import SdsFhirApiStub
from gateway_api.common.error import SdsRequestFailedError
from gateway_api.get_structured_record import (
ACCESS_RECORD_STRUCTURED_INTERACTION_ID,
SDS_SANDBOX_INTERACTION_ID,
)
from gateway_api.sds.search_results import SdsSearchResults
def get(
url: str,
headers: dict[str, str],
params: dict[str, str],
timeout: int,
) -> Response:
STUB_SDS = os.environ["SDS_URL"].lower() == "stub"
if not STUB_SDS:
return external_sds_get(url, headers=headers, params=params, timeout=timeout)
else:
return SdsFhirApiStub().get(
url, headers=headers, params=params, timeout=timeout
)
_logger = logging.getLogger(__name__)
class SdsResourceType(StrEnum):
"""SDS FHIR resource types."""
DEVICE = "Device"
ENDPOINT = "Endpoint"
class SdsClient:
"""
Simple client for SDS FHIR R4 device and endpoint retrieval.
The client supports:
* :meth:`get_org_details` - Retrieves ASID and endpoint for an organization
This method returns a :class:`SdsSearchResults` instance when data can be
extracted, otherwise ``None``.
**Stubbing**:
For testing, set the environment variable ``$SDS_URL`` to use the
:class:`SdsFhirApiStub` instead of making real HTTP requests.
**Usage example**::
sds = SdsClient(
base_url="https://sandbox.api.service.nhs.uk/spine-directory/FHIR/R4",
timeout=10,
service_interaction_id="urn:nhs:names:services:gpconnect:fhir:rest:read:metadata-1",
)
result = sds.get_org_details("A12345")
if result:
print(f"ASID: {result.asid}, Endpoint: {result.endpoint}")
"""
# Default service interaction ID for GP Connect
DEFAULT_SERVICE_INTERACTION_ID = ACCESS_RECORD_STRUCTURED_INTERACTION_ID
def __init__(
self,
base_url: str,
api_key: str,
timeout: int = 10,
service_interaction_id: str | None = None,
) -> None:
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.api_key = api_key
if service_interaction_id is not None:
self.service_interaction_id = service_interaction_id
elif (
self.base_url
== "https://sandbox.api.service.nhs.uk/spine-directory/FHIR/R4"
):
self.service_interaction_id = SDS_SANDBOX_INTERACTION_ID
else:
self.service_interaction_id = self.DEFAULT_SERVICE_INTERACTION_ID
log_details = {
"description": "Initialized SdsClient",
"base_url": self.base_url,
"service_interaction_id": self.service_interaction_id,
}
_logger.info(log_details)
def _build_headers(self, correlation_id: str | None = None) -> dict[str, str]:
"""
Build mandatory and optional headers for an SDS request.
"""
headers = {
"Accept": "application/fhir+json",
"apikey": self.api_key,
}
if correlation_id:
headers["X-Correlation-Id"] = correlation_id
return headers
def get_org_details(
self,
ods_code: str,
correlation_id: str | None = None,
timeout: int | None = None,
get_endpoint: bool = True,
) -> SdsSearchResults:
"""
Retrieve ASID and endpoint for an organization by ODS code.
This method performs two SDS queries:
1. Query /Device to get the ASID for the organization
2. Query /Endpoint to get the endpoint URL (if available)
"""
# Step 1: Get Device to obtain ASID
device_bundle = self._query_sds(
ods_code=ods_code,
correlation_id=correlation_id,
timeout=timeout,
querytype=SdsResourceType.DEVICE,
)
device = self._extract_first_resource(device_bundle, Device)
if not device:
empty_search_results = SdsSearchResults(asid=None, endpoint=None)
return empty_search_results
asid = self._extract_device_identifier(device, FHIRSystem.NHS_SPINE_ASID)
# Step 2: Get Endpoint to obtain endpoint URL
endpoint_url: str | None = None
if not get_endpoint:
return SdsSearchResults(asid=asid, endpoint=None)
endpoint_bundle = self._query_sds(
ods_code=ods_code,
correlation_id=correlation_id,
timeout=timeout,
querytype=SdsResourceType.ENDPOINT,
)
endpoint = self._extract_first_resource(endpoint_bundle, Endpoint)
if endpoint and endpoint.address:
endpoint_url = str(endpoint.address).strip()
return SdsSearchResults(asid=asid, endpoint=endpoint_url)
def _query_sds(
self,
ods_code: str,
correlation_id: str | None = None,
timeout: int | None = 10,
querytype: SdsResourceType = SdsResourceType.DEVICE,
) -> Bundle:
"""
Query SDS /Device or /Endpoint endpoint.
"""
headers = self._build_headers(correlation_id=correlation_id)
url = f"{self.base_url}/{querytype.value}"
params: dict[str, Any] = {
"organization": f"{FHIRSystem.ODS_CODE}|{ods_code}",
"identifier": [
f"{FHIRSystem.NHS_SERVICE_INTERACTION_ID}|{self.service_interaction_id}"
],
}
log_details = {
"description": "SDS request",
"url": url,
"params": params,
}
_logger.info(log_details)
response = get(
url,
headers=headers,
params=params,
timeout=timeout or self.timeout,
)
log_details = {
"description": "SDS response received",
"status_code": str(response.status_code),
}
_logger.info(log_details)
try:
response.raise_for_status()
except HTTPError as e:
raise SdsRequestFailedError(error_reason=str(e)) from e
bundle = Bundle.model_validate(response.json())
return bundle
@staticmethod
def _extract_first_resource[T: Resource](
bundle: Bundle, resource: type[T]
) -> T | None:
# TODO [GPCAPIM-365]: more carefully consider business logic for handling
# multiple entries in beta
resources = bundle.find_resources(resource)
if not resources:
return None
first_entry = resources[0]
return first_entry
def _extract_device_identifier(self, device: Device, system: str) -> str | None:
"""
Extract an identifier value from a Device resource for a given system.
"""
for identifier in device.identifier:
if identifier.system == system:
return identifier.value
return None