Coverage for backend/django/pinch_factory/pinch_factory.py: 85%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1import json 

2import os 

3import traceback 

4from typing import Any, Dict 

5import requests 

6 

7from PinchAnalysis.models.OutputModels import GraphDataPoint, HeatReceiverUtilitySummary, HeatSupplierUtilitySummary, PinchCurve, PinchGraph, PinchGraphSet, PinchTemp, TargetSummary 

8from PinchAnalysis.models.StreamDataProject import StreamDataProject 

9from PinchAnalysis.models.InputModels import PinchUtility, Segment 

10from core.auxiliary.enums.generalEnums import AbstractionType 

11from core.auxiliary.enums import pinchEnums 

12from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer 

13from core.auxiliary.models.Flowsheet import Flowsheet 

14 

15class PinchFactory: 

16 def __init__(self, flowsheet_id: int, num_intervals: int = 20, t_min: float = 1) -> None: 

17 self.project = Flowsheet.objects.get(pk=flowsheet_id).StreamDataProject 

18 self.flowsheet = self.project.flowsheet 

19 

20 # Sampling intervals for stream linearisation 

21 self.num_intervals = num_intervals 

22 # Maximum temperature difference between actual stream samples and linearisation curve 

23 self.t_min = t_min 

24 

25 def build_calculate_request(self, excluded_segments: list[int]) -> dict: 

26 """ 

27 Extract and prepare inputs from the project data. 

28 """ 

29 request_data = {} 

30 request_data['streams'] = [] 

31 request_data['utilities'] = [] 

32 for stream_data_entry in self.project.StreamDataEntries.all(): 

33 for segment in stream_data_entry.Segments.all(): 

34 if segment.id in excluded_segments: 

35 continue 

36 request_data['streams'].append({ 

37 "zone": segment.zone, 

38 "name": segment.name, 

39 "t_supply": segment.t_supply, 

40 "t_target": segment.t_target, 

41 "heat_flow": segment.heat_flow, 

42 "dt_cont": segment.dt_cont, 

43 "htc": segment.htc, 

44 }) 

45 for utility in self.project.Inputs.PinchUtilities.all(): 

46 request_data['utilities'].append({ 

47 "name": utility.name, 

48 "type": utility.type, 

49 "t_supply": utility.t_supply, 

50 "t_target": utility.t_target, 

51 "heat_flow": utility.heat_flow, 

52 "dt_cont": utility.dt_cont, 

53 "htc": utility.htc, 

54 "price": utility.price, 

55 }) 

56 

57 request_data['options'] = { 

58 'main': [prop.key for prop in self.project.Options.selections.containedProperties.all() if prop.get_value() is True], 

59 'turbine': [{"key": prop.key, "value": prop.get_value()} for prop in self.project.Options.turbine_options.properties.containedProperties.all()] 

60 } 

61 request_data['zone_tree'] = self.build_zone_structure() 

62 

63 return request_data 

64 

65 def build_zone_structure(self) -> list: 

66 # Step 1: Collect all groups related to the StreamDataEntries 

67 all_groups = {} 

68 for stream_data_entry in self.project.StreamDataEntries.all(): 

69 group = stream_data_entry.group 

70 zone = stream_data_entry.zone 

71 all_groups[zone] = { 

72 "children": [], 

73 "group": group, 

74 } 

75 # Step 2: Organize into tree 

76 root_node = None 

77 parent_zones = {} 

78 for zone_data in all_groups.values(): 

79 group = zone_data["group"] 

80 parent_group = group.get_parent_group() 

81 if parent_group: 

82 parent_zone = parent_group.simulationObject.componentName 

83 if parent_zone in all_groups: 83 ↛ 86line 83 didn't jump to line 86 because the condition on line 83 was always true

84 all_groups[parent_zone]["children"].append(zone_data) 

85 else: 

86 if parent_zone not in parent_zones: 

87 parent_zones[parent_zone] = { 

88 "children": [], 

89 "group": parent_group, 

90 } 

91 parent_zones[parent_zone]["children"].append(zone_data) 

92 for parent_zone_name, parent_zone_data in parent_zones.items(): 92 ↛ 93line 92 didn't jump to line 93 because the loop on line 92 never started

93 all_groups[parent_zone_name] = parent_zone_data 

94 for zone_data in all_groups.values(): 

95 group = zone_data["group"] 

96 parent_group = group.get_parent_group() 

97 if not parent_group: 

98 root_node = zone_data 

99 

100 # Step 3: Format it nicely 

101 def clean_node(node): 

102 return { 

103 "name": node["group"].simulationObject.componentName, 

104 "type": node["group"].abstractionType, 

105 "children": [clean_node(child) for child in node["children"]], 

106 } 

107 

108 if root_node is None: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 return None 

110 return clean_node(root_node) 

111 

112 def build_linearize_request(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str) -> dict: 

113 return ( 

114 { 

115 't_h_data': t_h_data, 

116 't_min': self.t_min, 

117 'num_intervals': self.num_intervals, 

118 'streams': streams_io_props, 

119 'mole_flow': mole_flow, 

120 'ppKey': ppKey 

121 } 

122 ) 

123 

124 def build_t_h_request(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None) -> dict: 

125 return ( 

126 { 

127 't_min': self.t_min, 

128 'num_intervals': self.num_intervals, 

129 'streams': streams_io_props, 

130 'mole_flow': mole_flow, 

131 'ppKey': ppKey, 

132 'prev_states': prev_states, 

133 } 

134 ) 

135 

136 def clear_outputs(self) -> None: 

137 """ 

138 Removes the previous outputs from the project  

139 """ 

140 output_owner = self.project.Outputs 

141 output_owner.targets.all().delete() 

142 output_owner.graph_sets.all().delete() 

143 

144 def run_calculate(self, excluded_segments: list[int]) -> Dict[str, Any]: 

145 """ 

146 Format data and send request to pinch service 

147 """ 

148 try: 

149 request_data = self.build_calculate_request(excluded_segments) 

150 # print(request_data) 

151 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "calculate" 

152 result = requests.post(url, json=request_data) 

153 if result.status_code != 200: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 raise Exception(result.json()) 

155 self.clear_outputs() 

156 response_data = result.json() 

157 self.serialize_return_data(response_data) 

158 

159 except Exception as e: 

160 print("Error during calculation:", e) 

161 print("Traceback:", traceback.format_exc()) 

162 raise RuntimeError("Calculation error occurred.") from e 

163 

164 def run_linearize(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str, **_): 

165 """ 

166 Linearizes a stream curve 

167 """ 

168 try: 

169 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "linearize" 

170 request_data = self.build_linearize_request(t_h_data, streams_io_props, mole_flow, ppKey) 

171 # print(request_data) 

172 result = requests.post(url, json=request_data) 

173 if result.status_code != 200: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 raise Exception(result.json()) 

175 response_data = result.json() 

176 return self.get_linear_streams(response_data) 

177 except Exception as e: 

178 print("Error during calculation:", e) 

179 print("Traceback:", traceback.format_exc()) 

180 raise RuntimeError("Calculation error occurred.") from e 

181 

182 def serialize_return_data(self, response_data): 

183 """ 

184 Converts output data to db entries, including nested objects. 

185 """ 

186 try: 

187 output_owner = self.project.Outputs 

188 targets = response_data.get('targets', None) 

189 graphs = response_data.get('graphs', None) 

190 

191 # Target Objects 

192 if targets: 192 ↛ 238line 192 didn't jump to line 238 because the condition on line 192 was always true

193 target_list = [] 

194 heat_suppliers = [] 

195 heat_receivers = [] 

196 for entry in targets: 

197 # Pop data that should not be included in the target creation 

198 temp_pinch_data = entry.pop('temp_pinch', None) 

199 hot_utilities = entry.pop('hot_utilities', []) 

200 cold_utilities = entry.pop('cold_utilities', []) 

201 

202 # print('temp_pinch_data', temp_pinch_data) 

203 

204 temp_pinch = PinchTemp.objects.create(**temp_pinch_data, flowsheet=self.flowsheet) if temp_pinch_data else None 

205 

206 # Create TargetSummary 

207 target = TargetSummary( 

208 output_owner=output_owner, 

209 temp_pinch=temp_pinch, 

210 **entry, 

211 flowsheet=self.flowsheet 

212 ) 

213 # print(target) 

214 target.save() 

215 

216 for supplier_utility in hot_utilities: 

217 heat_suppliers.append(HeatSupplierUtilitySummary( 

218 summary_owner=target, 

219 **supplier_utility, 

220 flowsheet=self.flowsheet 

221 )) 

222 

223 for receiver_utility in cold_utilities: 

224 heat_receivers.append(HeatReceiverUtilitySummary( 

225 summary_owner=target, 

226 **receiver_utility, 

227 flowsheet=self.flowsheet 

228 )) 

229 

230 target_list.append(target) 

231 

232 # Bulk create objects 

233 TargetSummary.objects.bulk_create(target_list, ignore_conflicts=True) 

234 HeatSupplierUtilitySummary.objects.bulk_create(heat_suppliers) 

235 HeatReceiverUtilitySummary.objects.bulk_create(heat_receivers) 

236 

237 # Graphs 

238 if graphs: 238 ↛ exitline 238 didn't return from function 'serialize_return_data' because the condition on line 238 was always true

239 graph_set_list = [] 

240 graph_list = [] 

241 curve_list = [] 

242 data_point_list = [] 

243 

244 for key, graph_set_data in graphs.items(): 

245 graph_set = PinchGraphSet(output_owner=output_owner, name=graph_set_data.get('name'), flowsheet=self.flowsheet) 

246 graph_set_list.append(graph_set) 

247 

248 # Create nested graphs 

249 for graph_data in graph_set_data.get('graphs', []): 

250 graph = PinchGraph( 

251 graph_set=graph_set, 

252 name=graph_data.get('name'), 

253 type=graph_data.get('type', pinchEnums.GraphType.CC), 

254 flowsheet=self.flowsheet 

255 ) 

256 graph_list.append(graph) 

257 

258 # Create nested curves 

259 for segment_data in graph_data.get('segments', []): 

260 curve = PinchCurve( 

261 graph=graph, 

262 title=segment_data.get('title'), 

263 colour=segment_data.get('colour', pinchEnums.LineColour.Hot), 

264 arrow=segment_data.get('arrow', pinchEnums.ArrowHead.NO_ARROW), 

265 flowsheet=self.flowsheet 

266 ) 

267 curve_list.append(curve) 

268 

269 for point in segment_data.get('data_points', []): 

270 data_point = GraphDataPoint( 

271 curve=curve, 

272 x=point.get('x'), 

273 y=point.get('y'), 

274 flowsheet=self.flowsheet 

275 ) 

276 data_point_list.append(data_point) 

277 

278 # Bulk create the objects 

279 PinchGraphSet.objects.bulk_create(graph_set_list) 

280 PinchGraph.objects.bulk_create(graph_list) 

281 PinchCurve.objects.bulk_create(curve_list) 

282 GraphDataPoint.objects.bulk_create(data_point_list) 

283 

284 except Exception as e: 

285 raise RuntimeError("Serialization error occurred.") from e 

286 

287 def get_linear_streams(self, response_data): 

288 return response_data['streams'] 

289 

290 def run_get_t_h_data(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None, **_): 

291 """ 

292 Get t_h data from streams 

293 """ 

294 try: 

295 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "generate_t_h_curve" 

296 request_data = self.build_t_h_request(streams_io_props, mole_flow, ppKey, prev_states) 

297 result = requests.post(url, json=request_data) 

298 if result.status_code != 200: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true

299 raise Exception(result.json()) 

300 response_data = result.json() 

301 return response_data 

302 except Exception as e: 

303 print("Error during calculation:", e) 

304 print("Traceback:", traceback.format_exc()) 

305 raise RuntimeError("Calculation error occurred.") from e