-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathauth.py
More file actions
128 lines (106 loc) · 4.18 KB
/
auth.py
File metadata and controls
128 lines (106 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import json
import logging
from datetime import datetime, timedelta, timezone
from okdata.sdk.auth.credentials.client_credentials import ClientCredentialsProvider
from okdata.sdk.auth.credentials.common import (
TokenProviderNotInitialized,
TokenRefreshError,
)
from okdata.sdk.auth.credentials.password_grant import TokenServiceProvider
from okdata.sdk.exceptions import ApiAuthenticateError
log = logging.getLogger()
def _is_expired(timestamp):
"""Return true if `timestamp` has expired (or is just about to expire)."""
return timestamp and (timestamp - datetime.now(timezone.utc)).total_seconds() < 10
class Authenticate:
_access_token = None
_refresh_token = None
_expires_at = None
_refresh_expires_at = None
# TODO: Remove keyword argument `file_cache` in a later release.
def __init__(self, config, token_provider=None, file_cache=None):
if file_cache is not None:
log.warning(
"Keyword argument `file_cache` to "
"`okdata.sdk.auth.auth.Authenticate` is deprecated and will "
"be removed in a later release of okdata-sdk."
)
self.token_provider = token_provider
if not self.token_provider:
try:
self.token_provider = next(self._resolve_token_provider(config))
log.info(
f"Found credentials for {self.token_provider.__class__.__name__}"
)
except StopIteration:
log.info("No valid auth strategies available")
def _resolve_token_provider(self, config):
# Add more TokenProviders to accept different login methods
strategies = [ClientCredentialsProvider, TokenServiceProvider]
for strat in strategies:
try:
yield strat(config)
except TokenProviderNotInitialized:
continue
except Exception as e:
log.exception(e)
raise e
# read only
@property
def access_token(self):
if not self.token_provider:
return None
if not self._access_token:
self.login()
# If expired, refresh
if _is_expired(self._expires_at):
self.refresh_access_token()
return self._access_token
def login(self, force=False):
if not self.token_provider:
return
if self._access_token and not _is_expired(self._expires_at):
log.info("Token not expired, skipping")
return
self.refresh_access_token()
def refresh_access_token(self):
if not self.token_provider:
return
tokens = None
if self._refresh_token and not _is_expired(self._refresh_expires_at):
try:
tokens = self.token_provider.refresh_token(self._refresh_token)
except TokenRefreshError as e:
log.warning(f"Error refreshing token: {e}")
if not tokens:
tokens = self.token_provider.new_token()
if "access_token" not in tokens:
raise ApiAuthenticateError
self._refresh_token = tokens.get("refresh_token")
self._expires_at = datetime.now(timezone.utc) + timedelta(
seconds=tokens.get("expires_in")
)
if refresh_expires_in := tokens.get("refresh_expires_in"):
self._refresh_expires_at = datetime.now(timezone.utc) + timedelta(
seconds=refresh_expires_in
)
self._access_token = tokens["access_token"]
def __repr__(self):
return json.dumps(
{
"provider": self.token_provider.__class__.__name__,
"access_token": self._access_token,
"refresh_token": self._refresh_token,
"expires_at": self._expires_at.isoformat() if self._expires_at else "",
"refresh_expires_at": (
self._refresh_expires_at.isoformat()
if self._refresh_expires_at
else ""
),
}
)
def __str__(self):
return self.__repr__()
class Authorize:
def __init__(self):
pass