Coverage for backend/pinch_factory/pinch_factory.py: 12%

160 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import json 

2import os 

3import traceback 

4from typing import Any, Dict 

5import requests 

6 

7from PinchAnalysis.models.OutputModels import GraphDataPoint, HeatReceiverUtilitySummary, HeatSupplierUtilitySummary, PinchCurve, PinchGraph, PinchGraphSet, PinchTemp, TargetSummary 

8from PinchAnalysis.models.StreamDataProject import StreamDataProject 

9from PinchAnalysis.models.InputModels import PinchUtility, Segment 

10from core.auxiliary.enums.generalEnums import AbstractionType 

11from core.auxiliary.enums import pinchEnums 

12from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer 

13from core.auxiliary.models.Flowsheet import Flowsheet 

14class PinchFactory: 

15 def __init__(self, flowsheet_id: int, num_intervals: int = 50, t_min: float = 1) -> None: 

16 project_id = Flowsheet.objects.get(pk=flowsheet_id).StreamDataProject.pk 

17 self.project = StreamDataProject.objects.get(pk=project_id) 

18 self.flowsheet = self.project.flowsheet 

19 self.num_intervals = num_intervals 

20 self.t_min = t_min 

21 

22 def build_calculate_request(self, excluded_segments: list[int]) -> dict: 

23 """ 

24 Extract and prepare inputs from the project data. 

25 """ 

26 request_data = {} 

27 request_data['streams'] = [] 

28 request_data['utilities'] = [] 

29 for segment in Segment.objects.all(): 

30 if segment.id in excluded_segments: 

31 continue 

32 request_data['streams'].append({ 

33 "zone": segment.zone, 

34 "name": segment.name, 

35 "t_supply": segment.t_supply, 

36 "t_target": segment.t_target, 

37 "heat_flow": segment.heat_flow, 

38 "dt_cont": segment.dt_cont, 

39 "htc": segment.htc, 

40 }) 

41 for utility in PinchUtility.objects.all(): 

42 request_data['utilities'].append({ 

43 "name": utility.name, 

44 "type": utility.type, 

45 "t_supply": utility.t_supply, 

46 "t_target": utility.t_target, 

47 "heat_flow": utility.heat_flow, 

48 "dt_cont": utility.dt_cont, 

49 "htc": utility.htc, 

50 "price": utility.price, 

51 }) 

52 

53 

54 #= SegmentSerializer(self.project.Inputs.PinchUtilities.all(), many=True).data 

55 

56 request_data['options'] = { 

57 'main': [prop.key for prop in self.project.Options.selections.containedProperties.all() if prop.get_value() is True], 

58 'turbine': [{"key": prop.key, "value": prop.get_value()} for prop in self.project.Options.turbine_options.properties.containedProperties.all()] 

59 } 

60 request_data['zone_tree'] = self.build_zone_structure() 

61 return request_data 

62 

63 def build_zone_structure(self) -> list: 

64 # Step 1: Collect all groups related to the StreamDataEntries 

65 all_groups = {} 

66 for stream_data_entry in self.project.StreamDataEntries.all(): 

67 group = stream_data_entry.group 

68 zone = stream_data_entry.zone 

69 all_groups[zone] = { 

70 "children": [], 

71 "group": group, 

72 } 

73 # Step 2: Organize into tree 

74 root_node = None 

75 parent_zones = {} 

76 for zone_data in all_groups.values(): 

77 group = zone_data["group"] 

78 parent_group = group.get_parent_group() 

79 if parent_group: 

80 parent_zone = parent_group.simulationObject.componentName 

81 if parent_zone in all_groups: 

82 all_groups[parent_zone]["children"].append(zone_data) 

83 else: 

84 if parent_zone not in parent_zones: 

85 parent_zones[parent_zone] = { 

86 "children": [], 

87 "group": parent_group, 

88 } 

89 parent_zones[parent_zone]["children"].append(zone_data) 

90 for parent_zone_name, parent_zone_data in parent_zones.items(): 

91 all_groups[parent_zone_name] = parent_zone_data 

92 for zone_data in all_groups.values(): 

93 group = zone_data["group"] 

94 parent_group = group.get_parent_group() 

95 if not parent_group: 

96 root_node = zone_data 

97 

98 # Step 3: Format it nicely 

99 def clean_node(node): 

100 return { 

101 "name": node["group"].simulationObject.componentName, 

102 "type": node["group"].abstractionType, 

103 "children": [clean_node(child) for child in node["children"]], 

104 } 

105 

106 if root_node is None: 

107 return None 

108 return clean_node(root_node) 

109 

110 

111 def build_linearize_request(self, t_h_data, data: list[dict], mole_flow: float, ppKey: str) -> dict: 

112 return ( 

113 { 

114 't_h_data': t_h_data, 

115 't_min': self.t_min, 

116 'num_intervals': self.num_intervals, 

117 'streams': data, 

118 'mole_flow': mole_flow, 

119 'ppKey': ppKey 

120 } 

121 ) 

122 def build_t_h_request(self, data: list[dict], mole_flow: float, ppKey: str) -> dict: 

123 return ( 

124 { 

125 't_min': self.t_min, 

126 'num_intervals': self.num_intervals, 

127 'streams': data, 

128 'mole_flow': mole_flow, 

129 'ppKey': ppKey 

130 } 

131 ) 

132 

133 def clear_outputs(self) -> None: 

134 """ 

135 Removes the previous outputs from the project  

136 """ 

137 output_owner = self.project.Outputs 

138 output_owner.targets.all().delete() 

139 output_owner.graph_sets.all().delete() 

140 

141 return 

142 

143 def run_calculate(self, excluded_segments: list[int]) -> Dict[str, Any]: 

144 """ 

145 Format data and send request to pinch service 

146 """ 

147 try: 

148 request_data = self.build_calculate_request(excluded_segments) 

149 # print(request_data) 

150 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "calculate" 

151 result = requests.post(url, json=request_data) 

152 if result.status_code != 200: 

153 raise Exception(result.json()) 

154 self.clear_outputs() 

155 response_data = result.json() 

156 self.serialize_return_data(response_data) 

157 

158 except Exception as e: 

159 print("Error during calculation:", e) 

160 print("Traceback:", traceback.format_exc()) 

161 raise RuntimeError("Calculation error occurred.") from e 

162 

163 def run_linearize(self, t_h_data, data: list[dict], mole_flow: float, ppKey: str): 

164 """ 

165 Linearizes a stream curve 

166 """ 

167 try: 

168 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "linearize" 

169 request_data = self.build_linearize_request(t_h_data, data, mole_flow, ppKey) 

170 # print(request_data) 

171 result = requests.post(url, json=request_data) 

172 if result.status_code != 200: 

173 raise Exception(result.json()) 

174 response_data = result.json() 

175 return self.get_linear_streams(response_data) 

176 except Exception as e: 

177 print("Error during calculation:", e) 

178 print("Traceback:", traceback.format_exc()) 

179 raise RuntimeError("Calculation error occurred.") from e 

180 

181 def serialize_return_data(self, response_data): 

182 """ 

183 Converts output data to db entries, including nested objects. 

184 """ 

185 try: 

186 output_owner = self.project.Outputs 

187 targets = response_data.get('targets', None) 

188 graphs = response_data.get('graphs', None) 

189 

190 # Target Objects 

191 if targets: 

192 target_list = [] 

193 heat_suppliers = [] 

194 heat_receivers = [] 

195 for entry in targets: 

196 # Pop data that should not be included in the target creation 

197 temp_pinch_data = entry.pop('temp_pinch', None) 

198 hot_utilities = entry.pop('hot_utilities', []) 

199 cold_utilities = entry.pop('cold_utilities', []) 

200 

201 # print('temp_pinch_data', temp_pinch_data) 

202 

203 temp_pinch = PinchTemp.objects.create(**temp_pinch_data, flowsheet=self.flowsheet) if temp_pinch_data else None 

204 

205 

206 

207 # Create TargetSummary 

208 target = TargetSummary( 

209 output_owner=output_owner, 

210 temp_pinch=temp_pinch, 

211 **entry, 

212 flowsheet=self.flowsheet 

213 ) 

214 # print(target) 

215 target.save() 

216 

217 for supplier_utility in hot_utilities: 

218 heat_suppliers.append(HeatSupplierUtilitySummary( 

219 summary_owner=target, 

220 **supplier_utility, 

221 flowsheet=self.flowsheet 

222 )) 

223 

224 for receiver_utility in cold_utilities: 

225 heat_receivers.append(HeatReceiverUtilitySummary( 

226 summary_owner=target, 

227 **receiver_utility, 

228 flowsheet=self.flowsheet 

229 )) 

230 

231 target_list.append(target) 

232 

233 # Bulk create objects 

234 TargetSummary.objects.bulk_create(target_list, ignore_conflicts=True) 

235 HeatSupplierUtilitySummary.objects.bulk_create(heat_suppliers) 

236 HeatReceiverUtilitySummary.objects.bulk_create(heat_receivers) 

237 

238 # Graphs 

239 if graphs: 

240 graph_set_list = [] 

241 graph_list = [] 

242 curve_list = [] 

243 data_point_list = [] 

244 

245 for key, graph_set_data in graphs.items(): 

246 graph_set = PinchGraphSet(output_owner=output_owner, name=graph_set_data.get('name'), flowsheet=self.flowsheet) 

247 graph_set_list.append(graph_set) 

248 

249 # Create nested graphs 

250 for graph_data in graph_set_data.get('graphs', []): 

251 graph = PinchGraph( 

252 graph_set=graph_set, 

253 name=graph_data.get('name'), 

254 type=graph_data.get('type', pinchEnums.GraphType.CC), 

255 flowsheet=self.flowsheet 

256 ) 

257 graph_list.append(graph) 

258 

259 # Create nested curves 

260 for segment_data in graph_data.get('segments', []): 

261 curve = PinchCurve( 

262 graph=graph, 

263 title=segment_data.get('title'), 

264 colour=segment_data.get('colour', pinchEnums.LineColour.Hot), 

265 arrow=segment_data.get('arrow', pinchEnums.ArrowHead.NO_ARROW), 

266 flowsheet=self.flowsheet 

267 ) 

268 curve_list.append(curve) 

269 

270 for point in segment_data.get('data_points', []): 

271 data_point = GraphDataPoint( 

272 curve=curve, 

273 x=point.get('x'), 

274 y=point.get('y'), 

275 flowsheet=self.flowsheet 

276 ) 

277 data_point_list.append(data_point) 

278 

279 # Bulk create the objects 

280 PinchGraphSet.objects.bulk_create(graph_set_list) 

281 PinchGraph.objects.bulk_create(graph_list) 

282 PinchCurve.objects.bulk_create(curve_list) 

283 GraphDataPoint.objects.bulk_create(data_point_list) 

284 

285 except Exception as e: 

286 raise RuntimeError("Serialization error occurred.") from e 

287 

288 def get_linear_streams(self, response_data): 

289 return response_data['streams'] 

290 

291 def run_get_t_h_data(self, data: list[dict], mole_flow: float, ppKey: str): 

292 """ 

293 Get t_h data from streams 

294 """ 

295 try: 

296 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "generate_t_h_curve" 

297 request_data = self.build_t_h_request(data, mole_flow, ppKey) 

298 result = requests.post(url, json=request_data) 

299 if result.status_code != 200: 

300 raise Exception(result.json()) 

301 response_data = result.json() 

302 return response_data 

303 except Exception as e: 

304 print("Error during calculation:", e) 

305 print("Traceback:", traceback.format_exc()) 

306 raise RuntimeError("Calculation error occurred.") from e