Coverage for backend/django/core/validation.py: 85%
142 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1import contextvars
2from contextlib import contextmanager
3from django.apps import apps
4from functools import wraps
5from authentication.user.models import User
6from core.viewset import ModelViewSet, ReadOnlyModelViewSet
7from django.urls.resolvers import URLPattern, URLResolver
8from rest_framework.exceptions import ValidationError
9from rest_framework.request import Request
10from rest_framework.viewsets import ModelViewSet as MDVS
13def sanitize_flowsheet_id(flowsheet_id):
14 """
15 Ensure the client has provided a valid flowsheet id (positive integer)
16 """
17 try:
18 flowsheet_id = int(flowsheet_id)
19 if not flowsheet_id:
20 raise ValidationError("Invalid flowsheet id")
21 if flowsheet_id < 1:
22 raise ValidationError("Invalid flowsheet id")
23 except:
24 raise ValidationError("Invalid flowsheet id")
27def api_view_validate(view_func):
28 """
29 Decorator for every api_view to enforce access control
30 """
32 @wraps(view_func)
33 def _wrapped_view(request: Request, *args, **kwargs):
34 flowsheet_id = request.GET.get("flowsheet")
35 user = request.user
36 sanitize_flowsheet_id(flowsheet_id)
38 with flowsheet_context(flowsheet_id, user):
39 return view_func(request, *args, **kwargs)
40 _wrapped_view._is_api_view_validated = True
41 return _wrapped_view
43def api_view_ignore_access_control(view_func):
44 """
45 Decorator for api_view to ignore access control. This is not for
46 general use. Attach only to internal endpoints/handlers (such as
47 for Dapr to invoke).
48 """
50 @wraps(view_func)
51 def _wrapped_view(request: Request, *args, **kwargs):
52 return view_func(request, *args, **kwargs)
54 _wrapped_view.ignore_access_control = True
55 return _wrapped_view
57################## Context manager ##################
58flowsheet_ctx = contextvars.ContextVar("flowsheet", default=None)
60@contextmanager
61def flowsheet_context(flowsheet: int, user: User):
62 """
63 Sanitize and inject flowsheet and user id as context for each view
64 """
66 data = {
67 "flowsheet": flowsheet,
68 "user": user,
69 "has_access": None
70 }
71 token = flowsheet_ctx.set(data)
73 try:
74 # Allow the view to execute with the context set
75 yield
76 finally:
77 try:
78 flowsheet_ctx.reset(token)
79 except ValueError:
80 # This is to reset the context if the server throws an error while processing the request
81 # So that it doesn't leak to the next request
82 flowsheet_ctx.set(None)
84def get_current_flowsheet():
85 """
86 Get the current flowsheet and user id from the context in format
87 {
88 "flowsheet": flowsheet_id,
89 "user": User,
90 "has_access": has_access
91 }
92 """
93 return flowsheet_ctx.get()
96def cache_result(has_access: bool = False):
97 """
98 Cache the result of the flowsheet context
99 """
100 data = flowsheet_ctx.get()
101 data["has_access"] = has_access
102 flowsheet_ctx.set(data)
104################## Router and urlpattern validation ##################
107def validate_router(router):
108 """
109 Validate that every registered viewset is a subclass of ModelViewSet
110 """
111 exclude_list = ["flowsheets", "flowsheetTemplates", "compounds"]
112 viewsets = [ModelViewSet, ReadOnlyModelViewSet]
113 for (prefix, viewset, basename) in router.registry:
114 if prefix in exclude_list:
115 continue
117 if not any(issubclass(viewset, vs) for vs in viewsets): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 raise Exception(f"ModelViewSet (from core.viewset) is not being inherited at {prefix}!")
120 if getattr(viewset, "get_queryset", None) == getattr(MDVS, "get_queryset"): 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 raise Exception(f"get_queryset is not being overridden at {prefix}! Please override get_queryset method to provide the queryset")
123 if getattr(viewset, "queryset", None) != None: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 raise Exception(f"Please remove queryset from {prefix} viewset and create a get_queryset method instead. \
125 Avoid creating the queryset attribute to enforce access control.")
128def validate_urlpatterns(urlpatterns):
129 """
130 Validate that every view in urlpatterns is decorated with api_view_validate
131 """
132 all_views = extract_views_from_urlpatterns(urlpatterns)
133 for path, view in all_views:
134 if not hasattr(view, '_is_api_view_validated'): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 raise Exception(f"api_view_validate decorator (from core.validation) is not being used at {path}!!!")
138def extract_views_from_urlpatterns(urlpatterns, base_path=''):
139 """
140 Recursively extracts views from urlpatterns.
141 """
142 views = []
143 for pattern in urlpatterns:
144 if isinstance(pattern, URLPattern): 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true
145 path = base_path + str(pattern.pattern)
146 views.append((path, pattern.callback))
147 elif isinstance(pattern, URLResolver): # nested patterns (like routers)
148 nested_path = base_path + str(pattern.pattern)
149 views.extend(extract_views_from_urlpatterns(
150 pattern.url_patterns, nested_path))
151 return views
154# Check that all models have flowsheet attribute and use AccessControlManager or SoftDeleteManager
155def validate_models():
156 from core.managers import AccessControlManager, SoftDeleteManager
157 exclude_models = [
158 'User',
159 'Permission',
160 'Group',
161 'ContentType',
162 'Flowsheet',
163 'AccessTable',
164 'Session',
165 'TaskMeta'
166 ]
168 models = apps.get_models()
171 for model in models:
172 model_name = model.__name__
173 module_name = model.__module__
174 # Skip silk models
175 if module_name.startswith('silk'): 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 continue
177 objects = model.objects
178 if model_name in exclude_models:
179 continue
181 if 'flowsheetOwner' in [field.name for field in model._meta.get_fields()]: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead")
184 has_flowsheet = False
186 assert (
187 isinstance(objects, (AccessControlManager, SoftDeleteManager))
188 ), f"Model {model_name} doesn't have AccessControlManager or SoftDeleteManager (from core.managers). Either is required to enforce access control."
190 # Check if the object has a flowsheet or flowsheet attribute
191 if hasattr(model, 'flowsheet'): 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was always true
192 has_flowsheet = True
194 if hasattr(model, 'flowsheetOwner'): 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead")
197 assert has_flowsheet, f"Models should have flowsheet or flowsheet attribute for handling access control, model: {model_name} doesn't"
199def validate_routers():
200 import os
201 import ast
203 IGNORED_DIRS = {"site-packages"}
204 IGNORED_FILES = {
205 os.path.normpath("./authentication/routers.py")
206 }
208 def is_ignored(path: str) -> bool:
209 norm = os.path.normpath(path)
210 parts = norm.split(os.sep)
212 # Ignore specific directories
213 if any(part in IGNORED_DIRS for part in parts):
214 return True
216 # Ignore specific file paths
217 if norm in IGNORED_FILES:
218 return True
220 return False
222 issues = []
224 for root, _, files in os.walk("./"):
225 if is_ignored(root):
226 continue
228 for file in files:
229 if file == "routers.py":
230 path = os.path.join(root, file)
232 if is_ignored(path):
233 continue
235 with open(path, "r", encoding="utf-8") as f:
236 source = f.read()
238 tree = ast.parse(source)
240 found = False
242 # inspect AST for validate_router(...) calls
243 for node in ast.walk(tree): 243 ↛ 259line 243 didn't jump to line 259 because the loop on line 243 didn't complete
244 if isinstance(node, ast.Call):
245 func = node.func
247 # Extract function name
248 if isinstance(func, ast.Name):
249 name = func.id
250 elif isinstance(func, ast.Attribute): 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true
251 name = func.attr
252 else:
253 continue
255 if name == "validate_router":
256 found = True
257 break
259 if not found: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 issues.append(path)
263 if issues: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 issue_list = "\n".join(issues)
265 raise Exception(f"The following routers.py files are missing validate_router(...) calls:\n{issue_list}")