Coverage for backend/django/pinch_factory/pinch_factory.py: 81%

158 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-12-18 04:00 +0000

1import json 

2import os 

3import traceback 

4from typing import Any, Dict 

5import requests 

6 

7from PinchAnalysis.models.OutputModels import GraphDataPoint, HeatReceiverUtilitySummary, HeatSupplierUtilitySummary, PinchCurve, PinchGraph, PinchGraphSet, PinchTemp, TargetSummary 

8from PinchAnalysis.models.StreamDataProject import StreamDataProject 

9from PinchAnalysis.models.InputModels import PinchUtility, Segment 

10from core.auxiliary.enums.generalEnums import AbstractionType 

11from core.auxiliary.enums import pinchEnums 

12from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer 

13from core.auxiliary.models.Flowsheet import Flowsheet 

14 

15class PinchFactory: 

16 def __init__(self, flowsheet_id: int, num_intervals: int = 20, t_min: float = 1) -> None: 

17 self.project = Flowsheet.objects.get(pk=flowsheet_id).StreamDataProject 

18 self.flowsheet = self.project.flowsheet 

19 

20 # Sampling intervals for stream linearisation 

21 self.num_intervals = num_intervals 

22 # Maximum temperature difference between actual stream samples and linearisation curve 

23 self.t_min = t_min 

24 

25 def build_calculate_request(self, excluded_segments: list[int]) -> dict: 

26 """ 

27 Extract and prepare inputs from the project data. 

28 """ 

29 request_data = {} 

30 request_data['streams'] = [] 

31 request_data['utilities'] = [] 

32 for segment in Segment.objects.all(): 

33 if segment.id in excluded_segments: 

34 continue 

35 request_data['streams'].append({ 

36 "zone": segment.zone, 

37 "name": segment.name, 

38 "t_supply": segment.t_supply, 

39 "t_target": segment.t_target, 

40 "heat_flow": segment.heat_flow, 

41 "dt_cont": segment.dt_cont, 

42 "htc": segment.htc, 

43 }) 

44 for utility in PinchUtility.objects.all(): 44 ↛ 45line 44 didn't jump to line 45 because the loop on line 44 never started

45 request_data['utilities'].append({ 

46 "name": utility.name, 

47 "type": utility.type, 

48 "t_supply": utility.t_supply, 

49 "t_target": utility.t_target, 

50 "heat_flow": utility.heat_flow, 

51 "dt_cont": utility.dt_cont, 

52 "htc": utility.htc, 

53 "price": utility.price, 

54 }) 

55 

56 request_data['options'] = { 

57 'main': [prop.key for prop in self.project.Options.selections.containedProperties.all() if prop.get_value() is True], 

58 'turbine': [{"key": prop.key, "value": prop.get_value()} for prop in self.project.Options.turbine_options.properties.containedProperties.all()] 

59 } 

60 request_data['zone_tree'] = self.build_zone_structure() 

61 

62 return request_data 

63 

64 def build_zone_structure(self) -> list: 

65 # Step 1: Collect all groups related to the StreamDataEntries 

66 all_groups = {} 

67 for stream_data_entry in self.project.StreamDataEntries.all(): 

68 group = stream_data_entry.group 

69 zone = stream_data_entry.zone 

70 all_groups[zone] = { 

71 "children": [], 

72 "group": group, 

73 } 

74 # Step 2: Organize into tree 

75 root_node = None 

76 parent_zones = {} 

77 for zone_data in all_groups.values(): 

78 group = zone_data["group"] 

79 parent_group = group.get_parent_group() 

80 if parent_group: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 parent_zone = parent_group.simulationObject.componentName 

82 if parent_zone in all_groups: 

83 all_groups[parent_zone]["children"].append(zone_data) 

84 else: 

85 if parent_zone not in parent_zones: 

86 parent_zones[parent_zone] = { 

87 "children": [], 

88 "group": parent_group, 

89 } 

90 parent_zones[parent_zone]["children"].append(zone_data) 

91 for parent_zone_name, parent_zone_data in parent_zones.items(): 91 ↛ 92line 91 didn't jump to line 92 because the loop on line 91 never started

92 all_groups[parent_zone_name] = parent_zone_data 

93 for zone_data in all_groups.values(): 

94 group = zone_data["group"] 

95 parent_group = group.get_parent_group() 

96 if not parent_group: 96 ↛ 93line 96 didn't jump to line 93 because the condition on line 96 was always true

97 root_node = zone_data 

98 

99 # Step 3: Format it nicely 

100 def clean_node(node): 

101 return { 

102 "name": node["group"].simulationObject.componentName, 

103 "type": node["group"].abstractionType, 

104 "children": [clean_node(child) for child in node["children"]], 

105 } 

106 

107 if root_node is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 return None 

109 return clean_node(root_node) 

110 

111 def build_linearize_request(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str) -> dict: 

112 return ( 

113 { 

114 't_h_data': t_h_data, 

115 't_min': self.t_min, 

116 'num_intervals': self.num_intervals, 

117 'streams': streams_io_props, 

118 'mole_flow': mole_flow, 

119 'ppKey': ppKey 

120 } 

121 ) 

122 

123 def build_t_h_request(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None) -> dict: 

124 return ( 

125 { 

126 't_min': self.t_min, 

127 'num_intervals': self.num_intervals, 

128 'streams': streams_io_props, 

129 'mole_flow': mole_flow, 

130 'ppKey': ppKey, 

131 'prev_states': prev_states, 

132 } 

133 ) 

134 

135 def clear_outputs(self) -> None: 

136 """ 

137 Removes the previous outputs from the project  

138 """ 

139 output_owner = self.project.Outputs 

140 output_owner.targets.all().delete() 

141 output_owner.graph_sets.all().delete() 

142 

143 def run_calculate(self, excluded_segments: list[int]) -> Dict[str, Any]: 

144 """ 

145 Format data and send request to pinch service 

146 """ 

147 try: 

148 request_data = self.build_calculate_request(excluded_segments) 

149 # print(request_data) 

150 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "calculate" 

151 result = requests.post(url, json=request_data) 

152 if result.status_code != 200: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 raise Exception(result.json()) 

154 self.clear_outputs() 

155 response_data = result.json() 

156 self.serialize_return_data(response_data) 

157 

158 except Exception as e: 

159 print("Error during calculation:", e) 

160 print("Traceback:", traceback.format_exc()) 

161 raise RuntimeError("Calculation error occurred.") from e 

162 

163 def run_linearize(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str, **_): 

164 """ 

165 Linearizes a stream curve 

166 """ 

167 try: 

168 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "linearize" 

169 request_data = self.build_linearize_request(t_h_data, streams_io_props, mole_flow, ppKey) 

170 # print(request_data) 

171 result = requests.post(url, json=request_data) 

172 if result.status_code != 200: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 raise Exception(result.json()) 

174 response_data = result.json() 

175 return self.get_linear_streams(response_data) 

176 except Exception as e: 

177 print("Error during calculation:", e) 

178 print("Traceback:", traceback.format_exc()) 

179 raise RuntimeError("Calculation error occurred.") from e 

180 

181 def serialize_return_data(self, response_data): 

182 """ 

183 Converts output data to db entries, including nested objects. 

184 """ 

185 try: 

186 output_owner = self.project.Outputs 

187 targets = response_data.get('targets', None) 

188 graphs = response_data.get('graphs', None) 

189 

190 # Target Objects 

191 if targets: 191 ↛ 237line 191 didn't jump to line 237 because the condition on line 191 was always true

192 target_list = [] 

193 heat_suppliers = [] 

194 heat_receivers = [] 

195 for entry in targets: 

196 # Pop data that should not be included in the target creation 

197 temp_pinch_data = entry.pop('temp_pinch', None) 

198 hot_utilities = entry.pop('hot_utilities', []) 

199 cold_utilities = entry.pop('cold_utilities', []) 

200 

201 # print('temp_pinch_data', temp_pinch_data) 

202 

203 temp_pinch = PinchTemp.objects.create(**temp_pinch_data, flowsheet=self.flowsheet) if temp_pinch_data else None 

204 

205 # Create TargetSummary 

206 target = TargetSummary( 

207 output_owner=output_owner, 

208 temp_pinch=temp_pinch, 

209 **entry, 

210 flowsheet=self.flowsheet 

211 ) 

212 # print(target) 

213 target.save() 

214 

215 for supplier_utility in hot_utilities: 

216 heat_suppliers.append(HeatSupplierUtilitySummary( 

217 summary_owner=target, 

218 **supplier_utility, 

219 flowsheet=self.flowsheet 

220 )) 

221 

222 for receiver_utility in cold_utilities: 

223 heat_receivers.append(HeatReceiverUtilitySummary( 

224 summary_owner=target, 

225 **receiver_utility, 

226 flowsheet=self.flowsheet 

227 )) 

228 

229 target_list.append(target) 

230 

231 # Bulk create objects 

232 TargetSummary.objects.bulk_create(target_list, ignore_conflicts=True) 

233 HeatSupplierUtilitySummary.objects.bulk_create(heat_suppliers) 

234 HeatReceiverUtilitySummary.objects.bulk_create(heat_receivers) 

235 

236 # Graphs 

237 if graphs: 237 ↛ exitline 237 didn't return from function 'serialize_return_data' because the condition on line 237 was always true

238 graph_set_list = [] 

239 graph_list = [] 

240 curve_list = [] 

241 data_point_list = [] 

242 

243 for key, graph_set_data in graphs.items(): 

244 graph_set = PinchGraphSet(output_owner=output_owner, name=graph_set_data.get('name'), flowsheet=self.flowsheet) 

245 graph_set_list.append(graph_set) 

246 

247 # Create nested graphs 

248 for graph_data in graph_set_data.get('graphs', []): 

249 graph = PinchGraph( 

250 graph_set=graph_set, 

251 name=graph_data.get('name'), 

252 type=graph_data.get('type', pinchEnums.GraphType.CC), 

253 flowsheet=self.flowsheet 

254 ) 

255 graph_list.append(graph) 

256 

257 # Create nested curves 

258 for segment_data in graph_data.get('segments', []): 

259 curve = PinchCurve( 

260 graph=graph, 

261 title=segment_data.get('title'), 

262 colour=segment_data.get('colour', pinchEnums.LineColour.Hot), 

263 arrow=segment_data.get('arrow', pinchEnums.ArrowHead.NO_ARROW), 

264 flowsheet=self.flowsheet 

265 ) 

266 curve_list.append(curve) 

267 

268 for point in segment_data.get('data_points', []): 

269 data_point = GraphDataPoint( 

270 curve=curve, 

271 x=point.get('x'), 

272 y=point.get('y'), 

273 flowsheet=self.flowsheet 

274 ) 

275 data_point_list.append(data_point) 

276 

277 # Bulk create the objects 

278 PinchGraphSet.objects.bulk_create(graph_set_list) 

279 PinchGraph.objects.bulk_create(graph_list) 

280 PinchCurve.objects.bulk_create(curve_list) 

281 GraphDataPoint.objects.bulk_create(data_point_list) 

282 

283 except Exception as e: 

284 raise RuntimeError("Serialization error occurred.") from e 

285 

286 def get_linear_streams(self, response_data): 

287 return response_data['streams'] 

288 

289 def run_get_t_h_data(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None, **_): 

290 """ 

291 Get t_h data from streams 

292 """ 

293 try: 

294 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "generate_t_h_curve" 

295 request_data = self.build_t_h_request(streams_io_props, mole_flow, ppKey, prev_states) 

296 result = requests.post(url, json=request_data) 

297 if result.status_code != 200: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 raise Exception(result.json()) 

299 response_data = result.json() 

300 return response_data 

301 except Exception as e: 

302 print("Error during calculation:", e) 

303 print("Traceback:", traceback.format_exc()) 

304 raise RuntimeError("Calculation error occurred.") from e