Coverage for backend/django/core/validation.py: 85%
147 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
1import contextvars
2from contextlib import contextmanager
3from django.apps import apps
4from functools import wraps
5from authentication.user.models import User
6from core.viewset import ModelViewSet, ReadOnlyModelViewSet
7from django.urls.resolvers import URLPattern, URLResolver
8from rest_framework.exceptions import ValidationError
9from rest_framework.request import Request
10from rest_framework.viewsets import ModelViewSet as MDVS
11from typing import TypedDict
13def sanitize_flowsheet_id(flowsheet_id):
14 """
15 Ensure the client has provided a valid flowsheet id (positive integer)
16 """
17 try:
18 flowsheet_id = int(flowsheet_id)
19 if not flowsheet_id:
20 raise ValidationError("Invalid flowsheet id")
21 if flowsheet_id < 1:
22 raise ValidationError("Invalid flowsheet id")
23 except:
24 raise ValidationError("Invalid flowsheet id")
27def api_view_validate(view_func):
28 """
29 Decorator for every api_view to enforce access control
30 """
32 @wraps(view_func)
33 def _wrapped_view(request: Request, *args, **kwargs):
34 flowsheet_id = request.GET.get("flowsheet")
35 user = request.user
36 sanitize_flowsheet_id(flowsheet_id)
38 with flowsheet_context(flowsheet_id, user):
39 return view_func(request, *args, **kwargs)
40 _wrapped_view._is_api_view_validated = True
41 return _wrapped_view
43def api_view_ignore_access_control(view_func):
44 """
45 Decorator for api_view to ignore access control. This is not for
46 general use. Attach only to internal endpoints/handlers (such as
47 for Dapr to invoke).
48 """
50 @wraps(view_func)
51 def _wrapped_view(request: Request, *args, **kwargs):
52 return view_func(request, *args, **kwargs)
54 _wrapped_view.ignore_access_control = True
55 return _wrapped_view
59################## Context manager ##################
60class FlowsheetContext(TypedDict):
61 flowsheet: int
62 user: User
63 has_access: bool | None
65flowsheet_ctx = contextvars.ContextVar[FlowsheetContext | None]("flowsheet", default=None)
67@contextmanager
68def flowsheet_context(flowsheet: int, user: User):
69 """
70 Sanitize and inject flowsheet and user id as context for each view
71 """
73 data = FlowsheetContext({
74 "flowsheet": flowsheet,
75 "user": user,
76 "has_access": None
77 })
78 token = flowsheet_ctx.set(data)
80 try:
81 # Allow the view to execute with the context set
82 yield
83 finally:
84 try:
85 flowsheet_ctx.reset(token)
86 except ValueError:
87 # This is to reset the context if the server throws an error while processing the request
88 # So that it doesn't leak to the next request
89 flowsheet_ctx.set(None)
91def get_current_flowsheet():
92 """
93 Get the current flowsheet and user id from the context in format
94 {
95 "flowsheet": flowsheet_id,
96 "user": User,
97 "has_access": has_access
98 }
99 """
100 return flowsheet_ctx.get()
103def cache_result(has_access: bool = False):
104 """
105 Cache the result of the flowsheet context
106 """
107 data = flowsheet_ctx.get()
108 data["has_access"] = has_access
109 flowsheet_ctx.set(data)
111################## Router and urlpattern validation ##################
114def validate_router(router):
115 """
116 Validate that every registered viewset is a subclass of ModelViewSet
117 """
118 exclude_list = ["flowsheets", "flowsheetTemplates", "compounds"]
119 viewsets = [ModelViewSet, ReadOnlyModelViewSet]
120 for (prefix, viewset, basename) in router.registry:
121 if prefix in exclude_list:
122 continue
124 if not any(issubclass(viewset, vs) for vs in viewsets): 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 raise Exception(f"ModelViewSet (from core.viewset) is not being inherited at {prefix}!")
127 if getattr(viewset, "get_queryset", None) == getattr(MDVS, "get_queryset"): 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 raise Exception(f"get_queryset is not being overridden at {prefix}! Please override get_queryset method to provide the queryset")
130 if getattr(viewset, "queryset", None) != None: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 raise Exception(f"Please remove queryset from {prefix} viewset and create a get_queryset method instead. \
132 Avoid creating the queryset attribute to enforce access control.")
135def validate_urlpatterns(urlpatterns):
136 """
137 Validate that every view in urlpatterns is decorated with api_view_validate
138 """
139 all_views = extract_views_from_urlpatterns(urlpatterns)
140 for path, view in all_views:
141 if not hasattr(view, '_is_api_view_validated'): 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 raise Exception(f"api_view_validate decorator (from core.validation) is not being used at {path}!!!")
145def extract_views_from_urlpatterns(urlpatterns, base_path=''):
146 """
147 Recursively extracts views from urlpatterns.
148 """
149 views = []
150 for pattern in urlpatterns:
151 if isinstance(pattern, URLPattern): 151 ↛ 154line 151 didn't jump to line 154 because the condition on line 151 was always true
152 path = base_path + str(pattern.pattern)
153 views.append((path, pattern.callback))
154 elif isinstance(pattern, URLResolver): # nested patterns (like routers)
155 nested_path = base_path + str(pattern.pattern)
156 views.extend(extract_views_from_urlpatterns(
157 pattern.url_patterns, nested_path))
158 return views
161# Check that all models have flowsheet attribute and use AccessControlManager or SoftDeleteManager
162def validate_models():
163 from core.managers import AccessControlManager, SoftDeleteManager
164 exclude_models = [
165 'User',
166 'Permission',
167 'Group',
168 'ContentType',
169 'Flowsheet',
170 'AccessTable',
171 'Session',
172 'TaskMeta'
173 ]
175 models = apps.get_models()
178 for model in models:
179 model_name = model.__name__
180 module_name = model.__module__
181 # Skip silk models
182 if module_name.startswith('silk'): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 continue
184 objects = model.objects
185 if model_name in exclude_models:
186 continue
188 if 'flowsheetOwner' in [field.name for field in model._meta.get_fields()]: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead")
191 has_flowsheet = False
193 assert (
194 isinstance(objects, (AccessControlManager, SoftDeleteManager))
195 ), f"Model {model_name} doesn't have AccessControlManager or SoftDeleteManager (from core.managers). Either is required to enforce access control."
197 # Check if the object has a flowsheet or flowsheet attribute
198 if hasattr(model, 'flowsheet'): 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true
199 has_flowsheet = True
201 if hasattr(model, 'flowsheetOwner'): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead")
204 assert has_flowsheet, f"Models should have flowsheet or flowsheet attribute for handling access control, model: {model_name} doesn't"
206def validate_routers():
207 import os
208 import ast
210 IGNORED_DIRS = {"site-packages"}
211 IGNORED_FILES = {
212 os.path.normpath("./authentication/routers.py")
213 }
215 def is_ignored(path: str) -> bool:
216 norm = os.path.normpath(path)
217 parts = norm.split(os.sep)
219 # Ignore specific directories
220 if any(part in IGNORED_DIRS for part in parts):
221 return True
223 # Ignore specific file paths
224 if norm in IGNORED_FILES:
225 return True
227 return False
229 issues = []
231 for root, _, files in os.walk("./"):
232 if is_ignored(root):
233 continue
235 for file in files:
236 if file == "routers.py":
237 path = os.path.join(root, file)
239 if is_ignored(path):
240 continue
242 with open(path, "r", encoding="utf-8") as f:
243 source = f.read()
245 tree = ast.parse(source)
247 found = False
249 # inspect AST for validate_router(...) calls
250 for node in ast.walk(tree): 250 ↛ 266line 250 didn't jump to line 266 because the loop on line 250 didn't complete
251 if isinstance(node, ast.Call):
252 func = node.func
254 # Extract function name
255 if isinstance(func, ast.Name):
256 name = func.id
257 elif isinstance(func, ast.Attribute): 257 ↛ 260line 257 didn't jump to line 260 because the condition on line 257 was always true
258 name = func.attr
259 else:
260 continue
262 if name == "validate_router":
263 found = True
264 break
266 if not found: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 issues.append(path)
270 if issues: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 issue_list = "\n".join(issues)
272 raise Exception(f"The following routers.py files are missing validate_router(...) calls:\n{issue_list}")