Coverage for backend/django/core/auxiliary/viewsets/DataRowViewSet.py: 80%

136 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-02-11 21:43 +0000

1from core.viewset import ModelViewSet 

2from rest_framework.response import Response 

3from core.auxiliary.models.DataCell import DataCell 

4from core.auxiliary.models.DataRow import DataRow 

5from core.auxiliary.models.Solution import Solution 

6from core.auxiliary.models.Scenario import Scenario 

7from core.auxiliary.serializers.DataRowSerializer import DataRowSerializer 

8from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes 

9from rest_framework.decorators import action 

10from rest_framework import serializers 

11from core.pagination import ViewSetPagination 

12 

13 

14class TableSerializer(serializers.ListSerializer): 

15 child = serializers.DictField() 

16 

17class DataRowViewSet(ModelViewSet): 

18 serializer_class = DataRowSerializer 

19 pagination_class = ViewSetPagination 

20 

21 def get_queryset(self): 

22 queryset = DataRow.objects.all() 

23 scenario = self.request.query_params.get("scenario") 

24 if scenario is not None: 

25 queryset = queryset.filter(scenario_id=scenario) 

26 return queryset 

27 

28 @extend_schema( 

29 parameters=[ 

30 OpenApiParameter(name="scenario", 

31 required=True, type=OpenApiTypes.INT), 

32 ] 

33 ) 

34 def list(self, request): 

35 return super().list(request) 

36 

37 @extend_schema( 

38 parameters=[ 

39 OpenApiParameter(name="scenario", required=True, 

40 type=OpenApiTypes.INT), 

41 ], 

42 responses=TableSerializer() 

43 ) 

44 @action(detail=False, methods=['get'], url_path='full-table') 

45 def full_input_table(self, request): 

46 scenario_id = request.query_params.get("scenario") 

47 

48 if not scenario_id: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 return Response([], status=400) 

50 

51 try: 

52 scenario = Scenario.objects.get(id=scenario_id) 

53 except Scenario.DoesNotExist: 

54 return Response([], status=404) 

55 

56 is_dynamic = scenario.enable_dynamics 

57 

58 rows_qs = DataRow.objects.filter( 

59 scenario=scenario).order_by("index") 

60 paginated_rows = self.paginate_queryset(rows_qs) 

61 if not paginated_rows: 

62 return self.get_paginated_response([]) 

63 

64 row_ids = [r.id for r in paginated_rows] 

65 row_index_map = {r.id: r.index for r in paginated_rows} 

66 

67 # 2️⃣ Get all DataCells for those rows in ONE query 

68 data_cells_qs = ( 

69 DataCell.objects 

70 .filter(data_row_id__in=row_ids) 

71 .select_related("data_column") # avoid N+1 lookups 

72 ) 

73 

74 # 3️⃣ Build a set of all columns (distinct column names) 

75 columns = sorted({dc.data_column.name for dc in data_cells_qs}) 

76 # 4️⃣ Pre-group DataCells in memory by row_id 

77 values_by_row = {} 

78 for dc in data_cells_qs: 

79 idx = row_index_map[dc.data_row_id] 

80 if idx not in values_by_row: 

81 values_by_row[idx] = {} 

82 values_by_row[idx][dc.data_column.name] = dc.value 

83 

84 # 5️⃣ Build the table data 

85 data = [] 

86 for idx in sorted(row_index_map.values()): 

87 if is_dynamic: 

88 row_data = {"Time step": idx} 

89 else: 

90 row_data = {"Row": idx} 

91 for col in columns: 

92 row_data[col] = values_by_row.get(idx, {}).get(col) 

93 data.append(row_data) 

94 

95 response = self.get_paginated_response(TableSerializer(data).data) 

96 

97 return response 

98 

99 @extend_schema( 

100 parameters=[ 

101 OpenApiParameter(name="scenario", required=True, 

102 type=OpenApiTypes.INT), 

103 OpenApiParameter(name="solve_index", required=True, 

104 type=OpenApiTypes.INT), 

105 ], 

106 responses=TableSerializer() 

107 ) 

108 @action(detail=False, methods=['get'], url_path='input-row') 

109 def input_row(self, request): 

110 scenario_id = request.query_params.get("scenario") 

111 solve_index = request.query_params.get("solve_index") 

112 

113 if not scenario_id or not solve_index: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 return Response([], status=400) 

115 

116 try: 

117 scenario = Scenario.objects.get(id=scenario_id) 

118 except Scenario.DoesNotExist: 

119 return Response([], status=404) 

120 

121 is_dynamic = scenario.enable_dynamics 

122 

123 data_row = DataRow.objects.get( 

124 scenario=scenario, index=solve_index) 

125 

126 # Pull all values for this solve in one query 

127 data_cells_qs = DataCell.objects.filter( 

128 data_row_id=data_row.id 

129 ).select_related("data_column") 

130 

131 # Build a dictionary {column_name: value} in one pass 

132 input_value_map = { 

133 dc.data_column.name: dc.value for dc in data_cells_qs} 

134 

135 # Build final row 

136 if is_dynamic: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 input_row_data = {"Time step": data_row.index} 

138 else: 

139 input_row_data = {"Row": data_row.index} 

140 

141 input_row_data.update(input_value_map) 

142 

143 return Response(TableSerializer([input_row_data]).data) 

144 

145 @extend_schema( 

146 parameters=[ 

147 OpenApiParameter(name="scenario", required=True, 

148 type=OpenApiTypes.INT), 

149 OpenApiParameter(name="solve_index", required=True, 

150 type=OpenApiTypes.INT), 

151 OpenApiParameter(name="simulation_object", 

152 required=True, type=OpenApiTypes.INT), 

153 ], 

154 responses=TableSerializer() 

155 ) 

156 @action(detail=False, methods=['get'], url_path='output-row') 

157 def output_row(self, request): 

158 scenario_id = request.query_params.get("scenario") 

159 solve_index = request.query_params.get("solve_index") 

160 simulation_object = request.query_params.get("simulation_object") 

161 

162 if not scenario_id or not solve_index or not simulation_object: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 return Response([], status=400) 

164 

165 solve_index = int(solve_index) 

166 

167 try: 

168 scenario = Scenario.objects.get(id=scenario_id) 

169 except Scenario.DoesNotExist: 

170 return Response([], status=404) 

171 

172 is_dynamic = scenario.enable_dynamics 

173 

174 solutions = Solution.objects.filter( 

175 scenario=scenario, 

176 property__property__set__simulationObject_id=simulation_object, 

177 ).select_related("property").order_by("solve_index") 

178 

179 if not is_dynamic: 179 ↛ 185line 179 didn't jump to line 185 because the condition on line 179 was always true

180 solutions = solutions.filter(solve_index=solve_index) 

181 # Build a dictionary {column_name: value} in one pass 

182 output_value_map = { 

183 sol.property.property.displayName: sol.values[0] for sol in solutions} 

184 else: 

185 output_value_map = {} 

186 for sol in solutions: 

187 # uploaded more rows than actual time step -> 

188 # if access solve index where time step doesn't exist 

189 if solve_index < len(sol.values): 

190 output_value_map[sol.property.property.displayName] = sol.values[solve_index] 

191 

192 return Response(TableSerializer([output_value_map]).data) 

193 

194 @extend_schema( 

195 parameters=[ 

196 OpenApiParameter(name="scenario", required=True, 

197 type=OpenApiTypes.INT), 

198 OpenApiParameter(name="simulation_object", 

199 required=True, type=OpenApiTypes.INT), 

200 ], 

201 responses=TableSerializer() 

202 ) 

203 @action(detail=False, methods=['get'], url_path='output-graph') 

204 def output_graph(self, request): 

205 scenario_id = request.query_params.get("scenario") 

206 simulation_object = request.query_params.get("simulation_object") 

207 

208 try: 

209 scenario = Scenario.objects.get(id=scenario_id) 

210 except Scenario.DoesNotExist: 

211 return Response([], status=404) 

212 

213 is_dynamic = scenario.enable_dynamics 

214 

215 if not scenario_id or not simulation_object: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 return Response([], status=400) 

217 

218 solutions = Solution.objects.filter( 

219 scenario_id=scenario_id, 

220 property__property__set__simulationObject_id=simulation_object 

221 ).select_related("property", "property__property").prefetch_related("property__indexedItems").order_by("solve_index") 

222 

223 output_solutions = [] 

224 if is_dynamic: 

225 for sol in solutions: 

226 property_value = sol.property 

227 indexItem = property_value.indexedItems.first() 

228 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName 

229 arr = [] 

230 for i, value in enumerate(sol.values): 

231 arr.append({ 

232 "index": i, 

233 "key": key, 

234 "value": value 

235 }) 

236 output_solutions.append(arr) 

237 else: 

238 prop_key_mapping = {} 

239 for sol in solutions: 

240 property_value = sol.property 

241 indexItem = property_value.indexedItems.first() 

242 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName 

243 

244 if key not in prop_key_mapping: 

245 prop_key_mapping[key] = [] 

246 

247 prop_key_mapping[key].append({ 

248 "index": sol.solve_index, 

249 "key": key, 

250 "value": sol.values[0] # for mss, values has only one value 

251 }) 

252 output_solutions = list(prop_key_mapping.values()) 

253 

254 return Response(TableSerializer(output_solutions, many=True).data)