|
def _init_sigv4(self, session: Session) -> None: |
|
from urllib import parse |
|
|
|
import boto3 |
|
from botocore.auth import SigV4Auth |
|
from botocore.awsrequest import AWSRequest |
|
from requests import PreparedRequest |
|
from requests.adapters import HTTPAdapter |
|
|
|
class SigV4Adapter(HTTPAdapter): |
|
def __init__(self, **properties: str): |
|
self._properties = properties |
|
max_retries = property_as_int(self._properties, SIGV4_MAX_RETRIES, SIGV4_MAX_RETRIES_DEFAULT) |
|
super().__init__(max_retries=max_retries) |
|
self._boto_session = boto3.Session( |
|
profile_name=get_first_property_value(self._properties, AWS_PROFILE_NAME), |
|
region_name=get_first_property_value(self._properties, AWS_REGION), |
|
botocore_session=self._properties.get(BOTOCORE_SESSION), |
|
aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID), |
|
aws_secret_access_key=get_first_property_value(self._properties, AWS_SECRET_ACCESS_KEY), |
|
aws_session_token=get_first_property_value(self._properties, AWS_SESSION_TOKEN), |
|
) |
|
|
|
def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylint: disable=W0613 |
|
credentials = self._boto_session.get_credentials().get_frozen_credentials() |
|
region = self._properties.get(SIGV4_REGION, self._boto_session.region_name) |
|
service = self._properties.get(SIGV4_SERVICE, "execute-api") |
|
|
|
url = str(request.url).split("?")[0] |
|
query = str(parse.urlsplit(request.url).query) |
|
params = dict(parse.parse_qsl(query)) |
|
|
|
# remove the connection header as it will be updated after signing |
|
if "connection" in request.headers: |
|
del request.headers["connection"] |
|
# For empty bodies, explicitly set the content hash header to the SHA256 of an empty string |
|
if not request.body: |
|
request.headers["x-amz-content-sha256"] = EMPTY_BODY_SHA256 |
|
|
|
aws_request = AWSRequest( |
|
method=request.method, url=url, params=params, data=request.body, headers=dict(request.headers) |
|
) |
|
|
|
SigV4Auth(credentials, service, region).add_auth(aws_request) |
|
original_header = request.headers |
|
signed_headers = aws_request.headers |
|
relocated_headers = {} |
|
|
|
# relocate headers if there is a conflict with signed headers |
|
for header, value in original_header.items(): |
|
if header in signed_headers and signed_headers[header] != value: |
|
relocated_headers[f"Original-{header}"] = value |
|
|
|
request.headers.update(relocated_headers) |
|
request.headers.update(signed_headers) |
|
|
|
session.mount(self.uri, SigV4Adapter(**self.properties)) |
iceberg-python/pyiceberg/catalog/rest/__init__.py
Lines 764 to 820 in 8055a60
into new file
pyiceberg/catalog/rest/sigv4.pyand generally improve sigv4 implementation