Coverage for backend/django/core/auxiliary/viewsets/DataRowViewSet.py: 80%
136 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-12 01:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-12 01:47 +0000
1from core.viewset import ModelViewSet
2from rest_framework.response import Response
3from core.auxiliary.models.DataCell import DataCell
4from core.auxiliary.models.DataRow import DataRow
5from core.auxiliary.models.Solution import Solution
6from core.auxiliary.models.Scenario import Scenario
7from core.auxiliary.serializers.DataRowSerializer import DataRowSerializer
8from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
9from rest_framework.decorators import action
10from rest_framework import serializers
11from core.pagination import ViewSetPagination
14class TableSerializer(serializers.ListSerializer):
15 child = serializers.DictField()
17class DataRowViewSet(ModelViewSet):
18 serializer_class = DataRowSerializer
19 pagination_class = ViewSetPagination
21 def get_queryset(self):
22 queryset = DataRow.objects.all()
23 scenario = self.request.query_params.get("scenario")
24 if scenario is not None:
25 queryset = queryset.filter(scenario_id=scenario)
26 return queryset
28 @extend_schema(
29 parameters=[
30 OpenApiParameter(name="scenario",
31 required=True, type=OpenApiTypes.INT),
32 ]
33 )
34 def list(self, request):
35 return super().list(request)
37 @extend_schema(
38 parameters=[
39 OpenApiParameter(name="scenario", required=True,
40 type=OpenApiTypes.INT),
41 ],
42 responses=TableSerializer()
43 )
44 @action(detail=False, methods=['get'], url_path='full-table')
45 def full_input_table(self, request):
46 scenario_id = request.query_params.get("scenario")
48 if not scenario_id: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 return Response([], status=400)
51 try:
52 scenario = Scenario.objects.get(id=scenario_id)
53 except Scenario.DoesNotExist:
54 return Response([], status=404)
56 is_dynamic = scenario.enable_dynamics
58 rows_qs = DataRow.objects.filter(
59 scenario=scenario).order_by("index")
60 paginated_rows = self.paginate_queryset(rows_qs)
61 if not paginated_rows:
62 return self.get_paginated_response([])
64 row_ids = [r.id for r in paginated_rows]
65 row_index_map = {r.id: r.index for r in paginated_rows}
67 # 2️⃣ Get all DataCells for those rows in ONE query
68 data_cells_qs = (
69 DataCell.objects
70 .filter(data_row_id__in=row_ids)
71 .select_related("data_column") # avoid N+1 lookups
72 )
74 # 3️⃣ Build a set of all columns (distinct column names)
75 columns = sorted({dc.data_column.name for dc in data_cells_qs})
76 # 4️⃣ Pre-group DataCells in memory by row_id
77 values_by_row = {}
78 for dc in data_cells_qs:
79 idx = row_index_map[dc.data_row_id]
80 if idx not in values_by_row:
81 values_by_row[idx] = {}
82 values_by_row[idx][dc.data_column.name] = dc.value
84 # 5️⃣ Build the table data
85 data = []
86 for idx in sorted(row_index_map.values()):
87 if is_dynamic:
88 row_data = {"Time step": idx}
89 else:
90 row_data = {"Row": idx}
91 for col in columns:
92 row_data[col] = values_by_row.get(idx, {}).get(col)
93 data.append(row_data)
95 response = self.get_paginated_response(TableSerializer(data).data)
97 return response
99 @extend_schema(
100 parameters=[
101 OpenApiParameter(name="scenario", required=True,
102 type=OpenApiTypes.INT),
103 OpenApiParameter(name="solve_index", required=True,
104 type=OpenApiTypes.INT),
105 ],
106 responses=TableSerializer()
107 )
108 @action(detail=False, methods=['get'], url_path='input-row')
109 def input_row(self, request):
110 scenario_id = request.query_params.get("scenario")
111 solve_index = request.query_params.get("solve_index")
113 if not scenario_id or not solve_index: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 return Response([], status=400)
116 try:
117 scenario = Scenario.objects.get(id=scenario_id)
118 except Scenario.DoesNotExist:
119 return Response([], status=404)
121 is_dynamic = scenario.enable_dynamics
123 data_row = DataRow.objects.get(
124 scenario=scenario, index=solve_index)
126 # Pull all values for this solve in one query
127 data_cells_qs = DataCell.objects.filter(
128 data_row_id=data_row.id
129 ).select_related("data_column")
131 # Build a dictionary {column_name: value} in one pass
132 input_value_map = {
133 dc.data_column.name: dc.value for dc in data_cells_qs}
135 # Build final row
136 if is_dynamic: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 input_row_data = {"Time step": data_row.index}
138 else:
139 input_row_data = {"Row": data_row.index}
141 input_row_data.update(input_value_map)
143 return Response(TableSerializer([input_row_data]).data)
145 @extend_schema(
146 parameters=[
147 OpenApiParameter(name="scenario", required=True,
148 type=OpenApiTypes.INT),
149 OpenApiParameter(name="solve_index", required=True,
150 type=OpenApiTypes.INT),
151 OpenApiParameter(name="simulation_object",
152 required=True, type=OpenApiTypes.INT),
153 ],
154 responses=TableSerializer()
155 )
156 @action(detail=False, methods=['get'], url_path='output-row')
157 def output_row(self, request):
158 scenario_id = request.query_params.get("scenario")
159 solve_index = request.query_params.get("solve_index")
160 simulation_object = request.query_params.get("simulation_object")
162 if not scenario_id or not solve_index or not simulation_object: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 return Response([], status=400)
165 solve_index = int(solve_index)
167 try:
168 scenario = Scenario.objects.get(id=scenario_id)
169 except Scenario.DoesNotExist:
170 return Response([], status=404)
172 is_dynamic = scenario.enable_dynamics
174 solutions = Solution.objects.filter(
175 scenario=scenario,
176 property__property__set__simulationObject_id=simulation_object,
177 ).select_related("property").order_by("solve_index")
179 if not is_dynamic: 179 ↛ 185line 179 didn't jump to line 185 because the condition on line 179 was always true
180 solutions = solutions.filter(solve_index=solve_index)
181 # Build a dictionary {column_name: value} in one pass
182 output_value_map = {
183 sol.property.property.displayName: sol.values[0] for sol in solutions}
184 else:
185 output_value_map = {}
186 for sol in solutions:
187 # uploaded more rows than actual time step ->
188 # if access solve index where time step doesn't exist
189 if solve_index < len(sol.values):
190 output_value_map[sol.property.property.displayName] = sol.values[solve_index]
192 return Response(TableSerializer([output_value_map]).data)
194 @extend_schema(
195 parameters=[
196 OpenApiParameter(name="scenario", required=True,
197 type=OpenApiTypes.INT),
198 OpenApiParameter(name="simulation_object",
199 required=True, type=OpenApiTypes.INT),
200 ],
201 responses=TableSerializer()
202 )
203 @action(detail=False, methods=['get'], url_path='output-graph')
204 def output_graph(self, request):
205 scenario_id = request.query_params.get("scenario")
206 simulation_object = request.query_params.get("simulation_object")
208 try:
209 scenario = Scenario.objects.get(id=scenario_id)
210 except Scenario.DoesNotExist:
211 return Response([], status=404)
213 is_dynamic = scenario.enable_dynamics
215 if not scenario_id or not simulation_object: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 return Response([], status=400)
218 solutions = Solution.objects.filter(
219 scenario_id=scenario_id,
220 property__property__set__simulationObject_id=simulation_object
221 ).select_related("property", "property__property").prefetch_related("property__indexedItems").order_by("solve_index")
223 output_solutions = []
224 if is_dynamic:
225 for sol in solutions:
226 property_value = sol.property
227 indexItem = property_value.indexedItems.first()
228 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName
229 arr = []
230 for i, value in enumerate(sol.values):
231 arr.append({
232 "index": i,
233 "key": key,
234 "value": value
235 })
236 output_solutions.append(arr)
237 else:
238 prop_key_mapping = {}
239 for sol in solutions:
240 property_value = sol.property
241 indexItem = property_value.indexedItems.first()
242 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName
244 if key not in prop_key_mapping:
245 prop_key_mapping[key] = []
247 prop_key_mapping[key].append({
248 "index": sol.solve_index,
249 "key": key,
250 "value": sol.values[0] # for mss, values has only one value
251 })
252 output_solutions = list(prop_key_mapping.values())
254 return Response(TableSerializer(output_solutions, many=True).data)