Coverage for backend/core/auxiliary/viewsets/SolveValueViewSet.py: 21%
155 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from core.viewset import ModelViewSet
2from rest_framework.response import Response
3from core.auxiliary.models.SolveState import SolveValue, SolveState
4from core.auxiliary.models.Solution import Solution
5from core.auxiliary.models.Scenario import Scenario
6from core.auxiliary.serializers.SolveStateSerializer import SolveValueSerializer, SolveStateSerializer
7from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
8from rest_framework.decorators import action
9from rest_framework import serializers
10from core.pagination import ViewSetPagination
13class TableSerializer(serializers.ListSerializer):
14 child = serializers.DictField()
17class ResultTableSerializer(serializers.Serializer):
18 input = serializers.ListField(
19 child=serializers.DictField()
20 )
21 output = serializers.ListField(
22 child=serializers.DictField()
23 )
25class SolveValueViewSet(ModelViewSet):
27 serializer_class = SolveValueSerializer
29 # Filter to only show values for a specific property (propertyInfo)
30 def get_queryset(self):
31 queryset = SolveValue.objects.all()
32 expression_id = self.request.query_params.get("expression")
33 if expression_id is not None:
34 queryset = queryset.filter(expression_id=expression_id)
35 return queryset
37 @extend_schema(
38 parameters=[
39 OpenApiParameter(name="expression", required=True,
40 type=OpenApiTypes.INT),
41 ]
42 )
43 def list(self, request):
44 return super().list(request)
47class SolveStateViewSet(ModelViewSet):
48 serializer_class = SolveStateSerializer
49 pagination_class = ViewSetPagination
51 def get_queryset(self):
52 queryset = SolveState.objects.all()
53 optimization = self.request.query_params.get("optimization")
54 if optimization is not None:
55 queryset = queryset.filter(scenario_id=optimization)
56 return queryset
58 @extend_schema(
59 parameters=[
60 OpenApiParameter(name="optimization",
61 required=True, type=OpenApiTypes.INT),
62 ]
63 )
64 def list(self, request):
65 return super().list(request)
67 @extend_schema(
68 parameters=[
69 OpenApiParameter(name="optimization",
70 required=True, type=OpenApiTypes.INT),
71 ],
72 responses={200: OpenApiTypes.INT},
73 )
74 @action(detail=False, methods=['get'], url_path='count')
75 def solve_state_count(self, request):
76 queryset = SolveState.objects.filter(
77 scenario_id=request.query_params.get("optimization"))
78 count = queryset.count()
79 return Response(count)
81 @extend_schema(
82 parameters=[
83 OpenApiParameter(name="scenario", required=True,
84 type=OpenApiTypes.INT),
85 ],
86 responses=TableSerializer()
87 )
88 @action(detail=False, methods=['get'], url_path='full-table')
89 def full_input_table(self, request):
90 scenario_id = request.query_params.get("scenario")
92 if not scenario_id:
93 return Response([], status=400)
95 try:
96 scenario = Scenario.objects.get(id=scenario_id)
97 except Scenario.DoesNotExist:
98 return Response([], status=404)
100 is_dynamic = scenario.enable_dynamics
102 rows_qs = SolveState.objects.filter(
103 scenario=scenario).order_by("index")
104 paginated_rows = self.paginate_queryset(rows_qs)
105 if not paginated_rows:
106 return self.get_paginated_response([])
108 solve_ids = [r.id for r in paginated_rows]
109 solve_index_map = {r.id: r.index for r in paginated_rows}
111 # 2️⃣ Get all SolveValues for those solves in ONE query
112 solve_values_qs = (
113 SolveValue.objects
114 .filter(solve_id__in=solve_ids)
115 .select_related("expression") # avoid N+1 lookups
116 )
118 # 3️⃣ Build a set of all columns (distinct expression names)
119 columns = sorted({sv.expression.name for sv in solve_values_qs})
121 # 4️⃣ Pre-group SolveValues in memory by solve_id
122 values_by_solve = {}
123 for sv in solve_values_qs:
124 idx = solve_index_map[sv.solve_id]
125 if idx not in values_by_solve:
126 values_by_solve[idx] = {}
127 values_by_solve[idx][sv.expression.name] = sv.value
129 # 5️⃣ Build the table data
130 data = []
131 for idx in sorted(solve_index_map.values()):
132 if is_dynamic:
133 row_data = {"Time step": idx}
134 else:
135 row_data = {"Row": idx}
136 for col in columns:
137 row_data[col] = values_by_solve.get(idx, {}).get(col)
138 data.append(row_data)
140 response = self.get_paginated_response(TableSerializer(data).data)
142 return response
144 @extend_schema(
145 parameters=[
146 OpenApiParameter(name="scenario", required=True,
147 type=OpenApiTypes.INT),
148 OpenApiParameter(name="solve_index", required=True,
149 type=OpenApiTypes.INT),
150 ],
151 responses=TableSerializer()
152 )
153 @action(detail=False, methods=['get'], url_path='input-row')
154 def input_row(self, request):
155 scenario_id = request.query_params.get("scenario")
156 solve_index = request.query_params.get("solve_index")
158 if not scenario_id or not solve_index:
159 return Response([], status=400)
161 try:
162 scenario = Scenario.objects.get(id=scenario_id)
163 except Scenario.DoesNotExist:
164 return Response([], status=404)
166 is_dynamic = scenario.enable_dynamics
168 solve_state = SolveState.objects.get(
169 scenario=scenario, index=solve_index)
171 # Pull all values for this solve in one query
172 solve_values_qs = SolveValue.objects.filter(
173 solve_id=solve_state.id
174 ).select_related("expression")
176 # Build a dictionary {column_name: value} in one pass
177 input_value_map = {
178 sv.expression.name: sv.value for sv in solve_values_qs}
180 # Build final row
181 if is_dynamic:
182 input_row_data = {"Time step": solve_state.index}
183 else:
184 input_row_data = {"Row": solve_state.index}
186 input_row_data.update(input_value_map)
188 return Response(TableSerializer([input_row_data]).data)
190 @extend_schema(
191 parameters=[
192 OpenApiParameter(name="scenario", required=True,
193 type=OpenApiTypes.INT),
194 OpenApiParameter(name="solve_index", required=True,
195 type=OpenApiTypes.INT),
196 OpenApiParameter(name="simulation_object",
197 required=True, type=OpenApiTypes.INT),
198 ],
199 responses=TableSerializer()
200 )
201 @action(detail=False, methods=['get'], url_path='output-row')
202 def output_row(self, request):
203 scenario_id = request.query_params.get("scenario")
204 solve_index = request.query_params.get("solve_index")
205 simulation_object = request.query_params.get("simulation_object")
207 if not scenario_id or not solve_index or not simulation_object:
208 return Response([], status=400)
210 solve_index = int(solve_index)
212 try:
213 scenario = Scenario.objects.get(id=scenario_id)
214 except Scenario.DoesNotExist:
215 return Response([], status=404)
217 is_dynamic = scenario.enable_dynamics
219 solutions = Solution.objects.filter(
220 scenario=scenario,
221 property__property__set__simulationObject_id=simulation_object,
222 ).select_related("property").order_by("solve_index")
224 if not is_dynamic:
225 solutions = solutions.filter(solve_index=solve_index)
226 # Build a dictionary {column_name: value} in one pass
227 output_value_map = {
228 sol.property.property.displayName: sol.values[0] for sol in solutions}
229 else:
230 output_value_map = {}
231 for sol in solutions:
232 # uploaded more rows than actual time step ->
233 # if access solve index where time step doesn't exist
234 if solve_index < len(sol.values):
235 output_value_map[sol.property.property.displayName] = sol.values[solve_index]
237 return Response(TableSerializer([output_value_map]).data)
239 @extend_schema(
240 parameters=[
241 OpenApiParameter(name="scenario", required=True,
242 type=OpenApiTypes.INT),
243 OpenApiParameter(name="simulation_object",
244 required=True, type=OpenApiTypes.INT),
245 ],
246 responses=TableSerializer()
247 )
248 @action(detail=False, methods=['get'], url_path='output-graph')
249 def output_graph(self, request):
250 scenario_id = request.query_params.get("scenario")
251 simulation_object = request.query_params.get("simulation_object")
253 try:
254 scenario = Scenario.objects.get(id=scenario_id)
255 except Scenario.DoesNotExist:
256 return Response([], status=404)
258 is_dynamic = scenario.enable_dynamics
260 if not scenario_id or not simulation_object:
261 return Response([], status=400)
263 solutions = Solution.objects.filter(
264 scenario_id=scenario_id,
265 property__property__set__simulationObject_id=simulation_object
266 ).select_related("property", "property__property").prefetch_related("property__indexedItems").order_by("solve_index")
268 output_solutions = []
269 if is_dynamic:
270 for sol in solutions:
271 property_value = sol.property
272 indexItem = property_value.indexedItems.first()
273 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName
274 arr = []
275 for i, value in enumerate(sol.values):
276 arr.append({
277 "index": i,
278 "key": key,
279 "value": value
280 })
281 output_solutions.append(arr)
282 else:
283 prop_key_mapping = {}
284 for sol in solutions:
285 property_value = sol.property
286 indexItem = property_value.indexedItems.first()
287 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName
289 if key not in prop_key_mapping:
290 prop_key_mapping[key] = []
292 prop_key_mapping[key].append({
293 "index": sol.solve_index,
294 "key": key,
295 "value": sol.values[0] # for mss, values has only one value
296 })
297 output_solutions = list(prop_key_mapping.values())
299 return Response(TableSerializer(output_solutions, many=True).data)