-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathmfa.py
More file actions
127 lines (115 loc) · 5.03 KB
/
mfa.py
File metadata and controls
127 lines (115 loc) · 5.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import json
from allauth.headless.mfa.views import AuthenticateView
from rest_framework.views import APIView
from zango.apps.appauth.tasks import send_otp
from zango.core.api import get_api_response
from zango.core.utils import get_auth_priority, mask_email, mask_phone_number
class GetMFACodeViewAPIV1(APIView):
def get_user(self, username):
from django.db.models import Q
from zango.apps.appauth.models import AppUserModel
try:
return AppUserModel.objects.get(Q(email=username) | Q(mobile=username))
except AppUserModel.DoesNotExist:
return None
def get(self, request, *args, **kwargs):
policy = get_auth_priority(policy="two_factor_auth", request=request)
if not policy.get("required"):
return get_api_response(
success=False,
response_content={"message": "MFA not required"},
status=400,
)
else:
allowed_methods = policy.get("allowed_methods", [])
if len(allowed_methods) == 0:
return get_api_response(
success=False,
response_content={"message": "No MFA methods configured"},
status=400,
)
if len(request.session.get("account_authentication_methods", [])) > 0:
latest_auth_method = request.session["account_authentication_methods"][
0
]
preferred_method = None
request_data = {
"path": request.path,
"params": request.GET,
}
user = None
if latest_auth_method.get("email"):
user = self.get_user(latest_auth_method.get("email"))
if user is None:
return get_api_response(
success=False,
response_content={"message": "User not found"},
status=400,
)
preferred_method = "sms"
if preferred_method not in allowed_methods:
return get_api_response(
success=False,
response_content={"message": "SMS MFA method not allowed"},
status=400,
)
send_otp.delay(
method=preferred_method,
otp_type="two_factor_auth",
user_id=user.id,
tenant_id=request.tenant.id,
request_data=request_data,
user_role_id=request.session.get("role_id"),
message="Your 2FA code is",
)
else:
user = self.get_user(latest_auth_method.get("phone"))
if user is None:
return get_api_response(
success=False,
response_content={"message": "User not found"},
status=400,
)
preferred_method = "email"
if preferred_method not in allowed_methods:
return get_api_response(
success=False,
response_content={
"message": "Email MFA method not allowed"
},
status=400,
)
send_otp.delay(
method=preferred_method,
otp_type="two_factor_auth",
user_id=user.id,
tenant_id=request.tenant.id,
request_data=request_data,
user_role_id=request.session.get("role_id"),
message="Your 2FA code is",
subject="2FA Verification Code",
)
return get_api_response(
success=True,
response_content={
"message": f"Verification code sent to {preferred_method}",
"masked_destination": mask_email(user.email)
if preferred_method == "email"
else mask_phone_number(str(user.mobile)),
},
status=200,
)
else:
return get_api_response(
success=False,
response_content={"message": "User not authenticated"},
status=400,
)
class MFAVerifyViewAPIV1(AuthenticateView):
def post(self, request, *args, **kwargs):
resp = super().post(request, *args, **kwargs)
return get_api_response(
success=True,
response_content=json.loads(resp.content.decode("utf-8")),
status=resp.status_code,
)