-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy path__init__.py
More file actions
158 lines (136 loc) · 5.24 KB
/
__init__.py
File metadata and controls
158 lines (136 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# from flask import Flask
# app = Flask(__name__)
# @app.route("/api")
# def hello():
# print('******************* route hit')
# return "Hello, Flask!"
# if __name__ == "secureFlaskApp":
# print('******************* init')
# app.run()
import json
from six.moves.urllib.request import urlopen
from functools import wraps
from flask import Flask, request, jsonify, _request_ctx_stack
from flask_cors import cross_origin
from jose import jwt
app = Flask(__name__)
API_AUDIENCE = "https://funcapi.<tenantname>.onmicrosoft.com/user_impersonation"
TENANT_ID = "<tenantid>"
# Error handler
class AuthError(Exception):
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code
@app.errorhandler(AuthError)
def handle_auth_error(ex):
print('handling error')
response = jsonify(ex.error)
response.status_code = ex.status_code
return response
# Format error response and append status code
def get_token_auth_header():
"""Obtains the Access Token from the Authorization Header
"""
auth = request.headers.get("Authorization", None)
if not auth:
raise AuthError({"code": "authorization_header_missing",
"description":
"Authorization header is expected"}, 401)
parts = auth.split()
if parts[0].lower() != "bearer":
raise AuthError({"code": "invalid_header",
"description":
"Authorization header must start with"
" Bearer"}, 401)
elif len(parts) == 1:
raise AuthError({"code": "invalid_header",
"description": "Token not found"}, 401)
elif len(parts) > 2:
raise AuthError({"code": "invalid_header",
"description":
"Authorization header must be"
" Bearer token"}, 401)
token = parts[1]
return token
def requires_auth(f):
"""Determines if the Access Token is valid
"""
@wraps(f)
def decorated(*args, **kwargs):
try:
token = get_token_auth_header()
jsonurl = urlopen("https://login.microsoftonline.com/" +
TENANT_ID + "/discovery/v2.0/keys")
jwks = json.loads(jsonurl.read())
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
except Exception:
raise AuthError({"code": "invalid_header",
"description":
"Unable to parse authentication"
" token."}, 401)
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=["RS256"],
audience=API_AUDIENCE,
issuer="https://sts.windows.net/" + TENANT_ID + "/"
)
except jwt.ExpiredSignatureError:
raise AuthError({"code": "token_expired",
"description": "token is expired"}, 401)
except jwt.JWTClaimsError:
raise AuthError({"code": "invalid_claims",
"description":
"incorrect claims,"
"please check the audience and issuer"}, 401)
except Exception:
raise AuthError({"code": "invalid_header",
"description":
"Unable to parse authentication"
" token."}, 401)
_request_ctx_stack.top.current_user = payload
# print(_request_ctx_stack.top.current_user)
return f(*args, **kwargs)
raise AuthError({"code": "invalid_header",
"description": "Unable to find appropriate key"}, 401)
return decorated
def requires_scope(required_scope):
"""Determines if the required scope is present in the Access Token
Args:
required_scope (str): The scope required to access the resource
"""
token = get_token_auth_header()
unverified_claims = jwt.get_unverified_claims(token)
if unverified_claims.get("scope"):
token_scopes = unverified_claims["scope"].split()
for token_scope in token_scopes:
if token_scope == required_scope:
return True
return False
# Controllers API
# This doesn't need authentication
@app.route("/public")
@cross_origin(headers=['Content-Type', 'Authorization'])
def public():
response = "Public endpoint - open to all"
return jsonify(message=response)
# This needs authentication
@app.route("/api")
@cross_origin(headers=['Content-Type', 'Authorization'])
@requires_auth
def private():
return jsonify(message=_request_ctx_stack.top.current_user)
if __name__ == '__main__':
app.run()