Coverage for backend/core/auxiliary/viewsets/SolveValueViewSet.py: 21%

155 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from core.viewset import ModelViewSet 

2from rest_framework.response import Response 

3from core.auxiliary.models.SolveState import SolveValue, SolveState 

4from core.auxiliary.models.Solution import Solution 

5from core.auxiliary.models.Scenario import Scenario 

6from core.auxiliary.serializers.SolveStateSerializer import SolveValueSerializer, SolveStateSerializer 

7from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes 

8from rest_framework.decorators import action 

9from rest_framework import serializers 

10from core.pagination import ViewSetPagination 

11 

12 

13class TableSerializer(serializers.ListSerializer): 

14 child = serializers.DictField() 

15 

16 

17class ResultTableSerializer(serializers.Serializer): 

18 input = serializers.ListField( 

19 child=serializers.DictField() 

20 ) 

21 output = serializers.ListField( 

22 child=serializers.DictField() 

23 ) 

24 

25class SolveValueViewSet(ModelViewSet): 

26 

27 serializer_class = SolveValueSerializer 

28 

29 # Filter to only show values for a specific property (propertyInfo) 

30 def get_queryset(self): 

31 queryset = SolveValue.objects.all() 

32 expression_id = self.request.query_params.get("expression") 

33 if expression_id is not None: 

34 queryset = queryset.filter(expression_id=expression_id) 

35 return queryset 

36 

37 @extend_schema( 

38 parameters=[ 

39 OpenApiParameter(name="expression", required=True, 

40 type=OpenApiTypes.INT), 

41 ] 

42 ) 

43 def list(self, request): 

44 return super().list(request) 

45 

46 

47class SolveStateViewSet(ModelViewSet): 

48 serializer_class = SolveStateSerializer 

49 pagination_class = ViewSetPagination 

50 

51 def get_queryset(self): 

52 queryset = SolveState.objects.all() 

53 optimization = self.request.query_params.get("optimization") 

54 if optimization is not None: 

55 queryset = queryset.filter(scenario_id=optimization) 

56 return queryset 

57 

58 @extend_schema( 

59 parameters=[ 

60 OpenApiParameter(name="optimization", 

61 required=True, type=OpenApiTypes.INT), 

62 ] 

63 ) 

64 def list(self, request): 

65 return super().list(request) 

66 

67 @extend_schema( 

68 parameters=[ 

69 OpenApiParameter(name="optimization", 

70 required=True, type=OpenApiTypes.INT), 

71 ], 

72 responses={200: OpenApiTypes.INT}, 

73 ) 

74 @action(detail=False, methods=['get'], url_path='count') 

75 def solve_state_count(self, request): 

76 queryset = SolveState.objects.filter( 

77 scenario_id=request.query_params.get("optimization")) 

78 count = queryset.count() 

79 return Response(count) 

80 

81 @extend_schema( 

82 parameters=[ 

83 OpenApiParameter(name="scenario", required=True, 

84 type=OpenApiTypes.INT), 

85 ], 

86 responses=TableSerializer() 

87 ) 

88 @action(detail=False, methods=['get'], url_path='full-table') 

89 def full_input_table(self, request): 

90 scenario_id = request.query_params.get("scenario") 

91 

92 if not scenario_id: 

93 return Response([], status=400) 

94 

95 try: 

96 scenario = Scenario.objects.get(id=scenario_id) 

97 except Scenario.DoesNotExist: 

98 return Response([], status=404) 

99 

100 is_dynamic = scenario.enable_dynamics 

101 

102 rows_qs = SolveState.objects.filter( 

103 scenario=scenario).order_by("index") 

104 paginated_rows = self.paginate_queryset(rows_qs) 

105 if not paginated_rows: 

106 return self.get_paginated_response([]) 

107 

108 solve_ids = [r.id for r in paginated_rows] 

109 solve_index_map = {r.id: r.index for r in paginated_rows} 

110 

111 # 2️⃣ Get all SolveValues for those solves in ONE query 

112 solve_values_qs = ( 

113 SolveValue.objects 

114 .filter(solve_id__in=solve_ids) 

115 .select_related("expression") # avoid N+1 lookups 

116 ) 

117 

118 # 3️⃣ Build a set of all columns (distinct expression names) 

119 columns = sorted({sv.expression.name for sv in solve_values_qs}) 

120 

121 # 4️⃣ Pre-group SolveValues in memory by solve_id 

122 values_by_solve = {} 

123 for sv in solve_values_qs: 

124 idx = solve_index_map[sv.solve_id] 

125 if idx not in values_by_solve: 

126 values_by_solve[idx] = {} 

127 values_by_solve[idx][sv.expression.name] = sv.value 

128 

129 # 5️⃣ Build the table data 

130 data = [] 

131 for idx in sorted(solve_index_map.values()): 

132 if is_dynamic: 

133 row_data = {"Time step": idx} 

134 else: 

135 row_data = {"Row": idx} 

136 for col in columns: 

137 row_data[col] = values_by_solve.get(idx, {}).get(col) 

138 data.append(row_data) 

139 

140 response = self.get_paginated_response(TableSerializer(data).data) 

141 

142 return response 

143 

144 @extend_schema( 

145 parameters=[ 

146 OpenApiParameter(name="scenario", required=True, 

147 type=OpenApiTypes.INT), 

148 OpenApiParameter(name="solve_index", required=True, 

149 type=OpenApiTypes.INT), 

150 ], 

151 responses=TableSerializer() 

152 ) 

153 @action(detail=False, methods=['get'], url_path='input-row') 

154 def input_row(self, request): 

155 scenario_id = request.query_params.get("scenario") 

156 solve_index = request.query_params.get("solve_index") 

157 

158 if not scenario_id or not solve_index: 

159 return Response([], status=400) 

160 

161 try: 

162 scenario = Scenario.objects.get(id=scenario_id) 

163 except Scenario.DoesNotExist: 

164 return Response([], status=404) 

165 

166 is_dynamic = scenario.enable_dynamics 

167 

168 solve_state = SolveState.objects.get( 

169 scenario=scenario, index=solve_index) 

170 

171 # Pull all values for this solve in one query 

172 solve_values_qs = SolveValue.objects.filter( 

173 solve_id=solve_state.id 

174 ).select_related("expression") 

175 

176 # Build a dictionary {column_name: value} in one pass 

177 input_value_map = { 

178 sv.expression.name: sv.value for sv in solve_values_qs} 

179 

180 # Build final row 

181 if is_dynamic: 

182 input_row_data = {"Time step": solve_state.index} 

183 else: 

184 input_row_data = {"Row": solve_state.index} 

185 

186 input_row_data.update(input_value_map) 

187 

188 return Response(TableSerializer([input_row_data]).data) 

189 

190 @extend_schema( 

191 parameters=[ 

192 OpenApiParameter(name="scenario", required=True, 

193 type=OpenApiTypes.INT), 

194 OpenApiParameter(name="solve_index", required=True, 

195 type=OpenApiTypes.INT), 

196 OpenApiParameter(name="simulation_object", 

197 required=True, type=OpenApiTypes.INT), 

198 ], 

199 responses=TableSerializer() 

200 ) 

201 @action(detail=False, methods=['get'], url_path='output-row') 

202 def output_row(self, request): 

203 scenario_id = request.query_params.get("scenario") 

204 solve_index = request.query_params.get("solve_index") 

205 simulation_object = request.query_params.get("simulation_object") 

206 

207 if not scenario_id or not solve_index or not simulation_object: 

208 return Response([], status=400) 

209 

210 solve_index = int(solve_index) 

211 

212 try: 

213 scenario = Scenario.objects.get(id=scenario_id) 

214 except Scenario.DoesNotExist: 

215 return Response([], status=404) 

216 

217 is_dynamic = scenario.enable_dynamics 

218 

219 solutions = Solution.objects.filter( 

220 scenario=scenario, 

221 property__property__set__simulationObject_id=simulation_object, 

222 ).select_related("property").order_by("solve_index") 

223 

224 if not is_dynamic: 

225 solutions = solutions.filter(solve_index=solve_index) 

226 # Build a dictionary {column_name: value} in one pass 

227 output_value_map = { 

228 sol.property.property.displayName: sol.values[0] for sol in solutions} 

229 else: 

230 output_value_map = {} 

231 for sol in solutions: 

232 # uploaded more rows than actual time step -> 

233 # if access solve index where time step doesn't exist 

234 if solve_index < len(sol.values): 

235 output_value_map[sol.property.property.displayName] = sol.values[solve_index] 

236 

237 return Response(TableSerializer([output_value_map]).data) 

238 

239 @extend_schema( 

240 parameters=[ 

241 OpenApiParameter(name="scenario", required=True, 

242 type=OpenApiTypes.INT), 

243 OpenApiParameter(name="simulation_object", 

244 required=True, type=OpenApiTypes.INT), 

245 ], 

246 responses=TableSerializer() 

247 ) 

248 @action(detail=False, methods=['get'], url_path='output-graph') 

249 def output_graph(self, request): 

250 scenario_id = request.query_params.get("scenario") 

251 simulation_object = request.query_params.get("simulation_object") 

252 

253 try: 

254 scenario = Scenario.objects.get(id=scenario_id) 

255 except Scenario.DoesNotExist: 

256 return Response([], status=404) 

257 

258 is_dynamic = scenario.enable_dynamics 

259 

260 if not scenario_id or not simulation_object: 

261 return Response([], status=400) 

262 

263 solutions = Solution.objects.filter( 

264 scenario_id=scenario_id, 

265 property__property__set__simulationObject_id=simulation_object 

266 ).select_related("property", "property__property").prefetch_related("property__indexedItems").order_by("solve_index") 

267 

268 output_solutions = [] 

269 if is_dynamic: 

270 for sol in solutions: 

271 property_value = sol.property 

272 indexItem = property_value.indexedItems.first() 

273 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName 

274 arr = [] 

275 for i, value in enumerate(sol.values): 

276 arr.append({ 

277 "index": i, 

278 "key": key, 

279 "value": value 

280 }) 

281 output_solutions.append(arr) 

282 else: 

283 prop_key_mapping = {} 

284 for sol in solutions: 

285 property_value = sol.property 

286 indexItem = property_value.indexedItems.first() 

287 key = f"{property_value.property.displayName} {indexItem.displayName}" if indexItem else property_value.property.displayName 

288 

289 if key not in prop_key_mapping: 

290 prop_key_mapping[key] = [] 

291 

292 prop_key_mapping[key].append({ 

293 "index": sol.solve_index, 

294 "key": key, 

295 "value": sol.values[0] # for mss, values has only one value 

296 }) 

297 output_solutions = list(prop_key_mapping.values()) 

298 

299 return Response(TableSerializer(output_solutions, many=True).data)