Coverage for backend/pinch_factory/pinch_factory.py: 12%
160 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import json
2import os
3import traceback
4from typing import Any, Dict
5import requests
7from PinchAnalysis.models.OutputModels import GraphDataPoint, HeatReceiverUtilitySummary, HeatSupplierUtilitySummary, PinchCurve, PinchGraph, PinchGraphSet, PinchTemp, TargetSummary
8from PinchAnalysis.models.StreamDataProject import StreamDataProject
9from PinchAnalysis.models.InputModels import PinchUtility, Segment
10from core.auxiliary.enums.generalEnums import AbstractionType
11from core.auxiliary.enums import pinchEnums
12from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer
13from core.auxiliary.models.Flowsheet import Flowsheet
14class PinchFactory:
15 def __init__(self, flowsheet_id: int, num_intervals: int = 50, t_min: float = 1) -> None:
16 project_id = Flowsheet.objects.get(pk=flowsheet_id).StreamDataProject.pk
17 self.project = StreamDataProject.objects.get(pk=project_id)
18 self.flowsheet = self.project.flowsheet
19 self.num_intervals = num_intervals
20 self.t_min = t_min
22 def build_calculate_request(self, excluded_segments: list[int]) -> dict:
23 """
24 Extract and prepare inputs from the project data.
25 """
26 request_data = {}
27 request_data['streams'] = []
28 request_data['utilities'] = []
29 for segment in Segment.objects.all():
30 if segment.id in excluded_segments:
31 continue
32 request_data['streams'].append({
33 "zone": segment.zone,
34 "name": segment.name,
35 "t_supply": segment.t_supply,
36 "t_target": segment.t_target,
37 "heat_flow": segment.heat_flow,
38 "dt_cont": segment.dt_cont,
39 "htc": segment.htc,
40 })
41 for utility in PinchUtility.objects.all():
42 request_data['utilities'].append({
43 "name": utility.name,
44 "type": utility.type,
45 "t_supply": utility.t_supply,
46 "t_target": utility.t_target,
47 "heat_flow": utility.heat_flow,
48 "dt_cont": utility.dt_cont,
49 "htc": utility.htc,
50 "price": utility.price,
51 })
54 #= SegmentSerializer(self.project.Inputs.PinchUtilities.all(), many=True).data
56 request_data['options'] = {
57 'main': [prop.key for prop in self.project.Options.selections.containedProperties.all() if prop.get_value() is True],
58 'turbine': [{"key": prop.key, "value": prop.get_value()} for prop in self.project.Options.turbine_options.properties.containedProperties.all()]
59 }
60 request_data['zone_tree'] = self.build_zone_structure()
61 return request_data
63 def build_zone_structure(self) -> list:
64 # Step 1: Collect all groups related to the StreamDataEntries
65 all_groups = {}
66 for stream_data_entry in self.project.StreamDataEntries.all():
67 group = stream_data_entry.group
68 zone = stream_data_entry.zone
69 all_groups[zone] = {
70 "children": [],
71 "group": group,
72 }
73 # Step 2: Organize into tree
74 root_node = None
75 parent_zones = {}
76 for zone_data in all_groups.values():
77 group = zone_data["group"]
78 parent_group = group.get_parent_group()
79 if parent_group:
80 parent_zone = parent_group.simulationObject.componentName
81 if parent_zone in all_groups:
82 all_groups[parent_zone]["children"].append(zone_data)
83 else:
84 if parent_zone not in parent_zones:
85 parent_zones[parent_zone] = {
86 "children": [],
87 "group": parent_group,
88 }
89 parent_zones[parent_zone]["children"].append(zone_data)
90 for parent_zone_name, parent_zone_data in parent_zones.items():
91 all_groups[parent_zone_name] = parent_zone_data
92 for zone_data in all_groups.values():
93 group = zone_data["group"]
94 parent_group = group.get_parent_group()
95 if not parent_group:
96 root_node = zone_data
98 # Step 3: Format it nicely
99 def clean_node(node):
100 return {
101 "name": node["group"].simulationObject.componentName,
102 "type": node["group"].abstractionType,
103 "children": [clean_node(child) for child in node["children"]],
104 }
106 if root_node is None:
107 return None
108 return clean_node(root_node)
111 def build_linearize_request(self, t_h_data, data: list[dict], mole_flow: float, ppKey: str) -> dict:
112 return (
113 {
114 't_h_data': t_h_data,
115 't_min': self.t_min,
116 'num_intervals': self.num_intervals,
117 'streams': data,
118 'mole_flow': mole_flow,
119 'ppKey': ppKey
120 }
121 )
122 def build_t_h_request(self, data: list[dict], mole_flow: float, ppKey: str) -> dict:
123 return (
124 {
125 't_min': self.t_min,
126 'num_intervals': self.num_intervals,
127 'streams': data,
128 'mole_flow': mole_flow,
129 'ppKey': ppKey
130 }
131 )
133 def clear_outputs(self) -> None:
134 """
135 Removes the previous outputs from the project
136 """
137 output_owner = self.project.Outputs
138 output_owner.targets.all().delete()
139 output_owner.graph_sets.all().delete()
141 return
143 def run_calculate(self, excluded_segments: list[int]) -> Dict[str, Any]:
144 """
145 Format data and send request to pinch service
146 """
147 try:
148 request_data = self.build_calculate_request(excluded_segments)
149 # print(request_data)
150 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "calculate"
151 result = requests.post(url, json=request_data)
152 if result.status_code != 200:
153 raise Exception(result.json())
154 self.clear_outputs()
155 response_data = result.json()
156 self.serialize_return_data(response_data)
158 except Exception as e:
159 print("Error during calculation:", e)
160 print("Traceback:", traceback.format_exc())
161 raise RuntimeError("Calculation error occurred.") from e
163 def run_linearize(self, t_h_data, data: list[dict], mole_flow: float, ppKey: str):
164 """
165 Linearizes a stream curve
166 """
167 try:
168 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "linearize"
169 request_data = self.build_linearize_request(t_h_data, data, mole_flow, ppKey)
170 # print(request_data)
171 result = requests.post(url, json=request_data)
172 if result.status_code != 200:
173 raise Exception(result.json())
174 response_data = result.json()
175 return self.get_linear_streams(response_data)
176 except Exception as e:
177 print("Error during calculation:", e)
178 print("Traceback:", traceback.format_exc())
179 raise RuntimeError("Calculation error occurred.") from e
181 def serialize_return_data(self, response_data):
182 """
183 Converts output data to db entries, including nested objects.
184 """
185 try:
186 output_owner = self.project.Outputs
187 targets = response_data.get('targets', None)
188 graphs = response_data.get('graphs', None)
190 # Target Objects
191 if targets:
192 target_list = []
193 heat_suppliers = []
194 heat_receivers = []
195 for entry in targets:
196 # Pop data that should not be included in the target creation
197 temp_pinch_data = entry.pop('temp_pinch', None)
198 hot_utilities = entry.pop('hot_utilities', [])
199 cold_utilities = entry.pop('cold_utilities', [])
201 # print('temp_pinch_data', temp_pinch_data)
203 temp_pinch = PinchTemp.objects.create(**temp_pinch_data, flowsheet=self.flowsheet) if temp_pinch_data else None
207 # Create TargetSummary
208 target = TargetSummary(
209 output_owner=output_owner,
210 temp_pinch=temp_pinch,
211 **entry,
212 flowsheet=self.flowsheet
213 )
214 # print(target)
215 target.save()
217 for supplier_utility in hot_utilities:
218 heat_suppliers.append(HeatSupplierUtilitySummary(
219 summary_owner=target,
220 **supplier_utility,
221 flowsheet=self.flowsheet
222 ))
224 for receiver_utility in cold_utilities:
225 heat_receivers.append(HeatReceiverUtilitySummary(
226 summary_owner=target,
227 **receiver_utility,
228 flowsheet=self.flowsheet
229 ))
231 target_list.append(target)
233 # Bulk create objects
234 TargetSummary.objects.bulk_create(target_list, ignore_conflicts=True)
235 HeatSupplierUtilitySummary.objects.bulk_create(heat_suppliers)
236 HeatReceiverUtilitySummary.objects.bulk_create(heat_receivers)
238 # Graphs
239 if graphs:
240 graph_set_list = []
241 graph_list = []
242 curve_list = []
243 data_point_list = []
245 for key, graph_set_data in graphs.items():
246 graph_set = PinchGraphSet(output_owner=output_owner, name=graph_set_data.get('name'), flowsheet=self.flowsheet)
247 graph_set_list.append(graph_set)
249 # Create nested graphs
250 for graph_data in graph_set_data.get('graphs', []):
251 graph = PinchGraph(
252 graph_set=graph_set,
253 name=graph_data.get('name'),
254 type=graph_data.get('type', pinchEnums.GraphType.CC),
255 flowsheet=self.flowsheet
256 )
257 graph_list.append(graph)
259 # Create nested curves
260 for segment_data in graph_data.get('segments', []):
261 curve = PinchCurve(
262 graph=graph,
263 title=segment_data.get('title'),
264 colour=segment_data.get('colour', pinchEnums.LineColour.Hot),
265 arrow=segment_data.get('arrow', pinchEnums.ArrowHead.NO_ARROW),
266 flowsheet=self.flowsheet
267 )
268 curve_list.append(curve)
270 for point in segment_data.get('data_points', []):
271 data_point = GraphDataPoint(
272 curve=curve,
273 x=point.get('x'),
274 y=point.get('y'),
275 flowsheet=self.flowsheet
276 )
277 data_point_list.append(data_point)
279 # Bulk create the objects
280 PinchGraphSet.objects.bulk_create(graph_set_list)
281 PinchGraph.objects.bulk_create(graph_list)
282 PinchCurve.objects.bulk_create(curve_list)
283 GraphDataPoint.objects.bulk_create(data_point_list)
285 except Exception as e:
286 raise RuntimeError("Serialization error occurred.") from e
288 def get_linear_streams(self, response_data):
289 return response_data['streams']
291 def run_get_t_h_data(self, data: list[dict], mole_flow: float, ppKey: str):
292 """
293 Get t_h data from streams
294 """
295 try:
296 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "generate_t_h_curve"
297 request_data = self.build_t_h_request(data, mole_flow, ppKey)
298 result = requests.post(url, json=request_data)
299 if result.status_code != 200:
300 raise Exception(result.json())
301 response_data = result.json()
302 return response_data
303 except Exception as e:
304 print("Error during calculation:", e)
305 print("Traceback:", traceback.format_exc())
306 raise RuntimeError("Calculation error occurred.") from e