Coverage for backend/django/pinch_factory/pinch_factory.py: 85%
159 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1import json
2import os
3import traceback
4from typing import Any, Dict
5import requests
7from PinchAnalysis.models.OutputModels import GraphDataPoint, HeatReceiverUtilitySummary, HeatSupplierUtilitySummary, PinchCurve, PinchGraph, PinchGraphSet, PinchTemp, TargetSummary
8from PinchAnalysis.models.StreamDataProject import StreamDataProject
9from PinchAnalysis.models.InputModels import PinchUtility, Segment
10from core.auxiliary.enums.generalEnums import AbstractionType
11from core.auxiliary.enums import pinchEnums
12from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer
13from core.auxiliary.models.Flowsheet import Flowsheet
15class PinchFactory:
16 def __init__(self, flowsheet_id: int, num_intervals: int = 20, t_min: float = 1) -> None:
17 self.project = Flowsheet.objects.get(pk=flowsheet_id).StreamDataProject
18 self.flowsheet = self.project.flowsheet
20 # Sampling intervals for stream linearisation
21 self.num_intervals = num_intervals
22 # Maximum temperature difference between actual stream samples and linearisation curve
23 self.t_min = t_min
25 def build_calculate_request(self, excluded_segments: list[int]) -> dict:
26 """
27 Extract and prepare inputs from the project data.
28 """
29 request_data = {}
30 request_data['streams'] = []
31 request_data['utilities'] = []
32 for stream_data_entry in self.project.StreamDataEntries.all():
33 for segment in stream_data_entry.Segments.all():
34 if segment.id in excluded_segments:
35 continue
36 request_data['streams'].append({
37 "zone": segment.zone,
38 "name": segment.name,
39 "t_supply": segment.t_supply,
40 "t_target": segment.t_target,
41 "heat_flow": segment.heat_flow,
42 "dt_cont": segment.dt_cont,
43 "htc": segment.htc,
44 })
45 for utility in self.project.Inputs.PinchUtilities.all():
46 request_data['utilities'].append({
47 "name": utility.name,
48 "type": utility.type,
49 "t_supply": utility.t_supply,
50 "t_target": utility.t_target,
51 "heat_flow": utility.heat_flow,
52 "dt_cont": utility.dt_cont,
53 "htc": utility.htc,
54 "price": utility.price,
55 })
57 request_data['options'] = {
58 'main': [prop.key for prop in self.project.Options.selections.containedProperties.all() if prop.get_value() is True],
59 'turbine': [{"key": prop.key, "value": prop.get_value()} for prop in self.project.Options.turbine_options.properties.containedProperties.all()]
60 }
61 request_data['zone_tree'] = self.build_zone_structure()
63 return request_data
65 def build_zone_structure(self) -> list:
66 # Step 1: Collect all groups related to the StreamDataEntries
67 all_groups = {}
68 for stream_data_entry in self.project.StreamDataEntries.all():
69 group = stream_data_entry.group
70 zone = stream_data_entry.zone
71 all_groups[zone] = {
72 "children": [],
73 "group": group,
74 }
75 # Step 2: Organize into tree
76 root_node = None
77 parent_zones = {}
78 for zone_data in all_groups.values():
79 group = zone_data["group"]
80 parent_group = group.get_parent_group()
81 if parent_group:
82 parent_zone = parent_group.simulationObject.componentName
83 if parent_zone in all_groups: 83 ↛ 86line 83 didn't jump to line 86 because the condition on line 83 was always true
84 all_groups[parent_zone]["children"].append(zone_data)
85 else:
86 if parent_zone not in parent_zones:
87 parent_zones[parent_zone] = {
88 "children": [],
89 "group": parent_group,
90 }
91 parent_zones[parent_zone]["children"].append(zone_data)
92 for parent_zone_name, parent_zone_data in parent_zones.items(): 92 ↛ 93line 92 didn't jump to line 93 because the loop on line 92 never started
93 all_groups[parent_zone_name] = parent_zone_data
94 for zone_data in all_groups.values():
95 group = zone_data["group"]
96 parent_group = group.get_parent_group()
97 if not parent_group:
98 root_node = zone_data
100 # Step 3: Format it nicely
101 def clean_node(node):
102 return {
103 "name": node["group"].simulationObject.componentName,
104 "type": node["group"].abstractionType,
105 "children": [clean_node(child) for child in node["children"]],
106 }
108 if root_node is None: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 return None
110 return clean_node(root_node)
112 def build_linearize_request(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str) -> dict:
113 return (
114 {
115 't_h_data': t_h_data,
116 't_min': self.t_min,
117 'num_intervals': self.num_intervals,
118 'streams': streams_io_props,
119 'mole_flow': mole_flow,
120 'ppKey': ppKey
121 }
122 )
124 def build_t_h_request(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None) -> dict:
125 return (
126 {
127 't_min': self.t_min,
128 'num_intervals': self.num_intervals,
129 'streams': streams_io_props,
130 'mole_flow': mole_flow,
131 'ppKey': ppKey,
132 'prev_states': prev_states,
133 }
134 )
136 def clear_outputs(self) -> None:
137 """
138 Removes the previous outputs from the project
139 """
140 output_owner = self.project.Outputs
141 output_owner.targets.all().delete()
142 output_owner.graph_sets.all().delete()
144 def run_calculate(self, excluded_segments: list[int]) -> Dict[str, Any]:
145 """
146 Format data and send request to pinch service
147 """
148 try:
149 request_data = self.build_calculate_request(excluded_segments)
150 # print(request_data)
151 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "calculate"
152 result = requests.post(url, json=request_data)
153 if result.status_code != 200: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 raise Exception(result.json())
155 self.clear_outputs()
156 response_data = result.json()
157 self.serialize_return_data(response_data)
159 except Exception as e:
160 print("Error during calculation:", e)
161 print("Traceback:", traceback.format_exc())
162 raise RuntimeError("Calculation error occurred.") from e
164 def run_linearize(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str, **_):
165 """
166 Linearizes a stream curve
167 """
168 try:
169 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "linearize"
170 request_data = self.build_linearize_request(t_h_data, streams_io_props, mole_flow, ppKey)
171 # print(request_data)
172 result = requests.post(url, json=request_data)
173 if result.status_code != 200: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 raise Exception(result.json())
175 response_data = result.json()
176 return self.get_linear_streams(response_data)
177 except Exception as e:
178 print("Error during calculation:", e)
179 print("Traceback:", traceback.format_exc())
180 raise RuntimeError("Calculation error occurred.") from e
182 def serialize_return_data(self, response_data):
183 """
184 Converts output data to db entries, including nested objects.
185 """
186 try:
187 output_owner = self.project.Outputs
188 targets = response_data.get('targets', None)
189 graphs = response_data.get('graphs', None)
191 # Target Objects
192 if targets: 192 ↛ 238line 192 didn't jump to line 238 because the condition on line 192 was always true
193 target_list = []
194 heat_suppliers = []
195 heat_receivers = []
196 for entry in targets:
197 # Pop data that should not be included in the target creation
198 temp_pinch_data = entry.pop('temp_pinch', None)
199 hot_utilities = entry.pop('hot_utilities', [])
200 cold_utilities = entry.pop('cold_utilities', [])
202 # print('temp_pinch_data', temp_pinch_data)
204 temp_pinch = PinchTemp.objects.create(**temp_pinch_data, flowsheet=self.flowsheet) if temp_pinch_data else None
206 # Create TargetSummary
207 target = TargetSummary(
208 output_owner=output_owner,
209 temp_pinch=temp_pinch,
210 **entry,
211 flowsheet=self.flowsheet
212 )
213 # print(target)
214 target.save()
216 for supplier_utility in hot_utilities:
217 heat_suppliers.append(HeatSupplierUtilitySummary(
218 summary_owner=target,
219 **supplier_utility,
220 flowsheet=self.flowsheet
221 ))
223 for receiver_utility in cold_utilities:
224 heat_receivers.append(HeatReceiverUtilitySummary(
225 summary_owner=target,
226 **receiver_utility,
227 flowsheet=self.flowsheet
228 ))
230 target_list.append(target)
232 # Bulk create objects
233 TargetSummary.objects.bulk_create(target_list, ignore_conflicts=True)
234 HeatSupplierUtilitySummary.objects.bulk_create(heat_suppliers)
235 HeatReceiverUtilitySummary.objects.bulk_create(heat_receivers)
237 # Graphs
238 if graphs: 238 ↛ exitline 238 didn't return from function 'serialize_return_data' because the condition on line 238 was always true
239 graph_set_list = []
240 graph_list = []
241 curve_list = []
242 data_point_list = []
244 for key, graph_set_data in graphs.items():
245 graph_set = PinchGraphSet(output_owner=output_owner, name=graph_set_data.get('name'), flowsheet=self.flowsheet)
246 graph_set_list.append(graph_set)
248 # Create nested graphs
249 for graph_data in graph_set_data.get('graphs', []):
250 graph = PinchGraph(
251 graph_set=graph_set,
252 name=graph_data.get('name'),
253 type=graph_data.get('type', pinchEnums.GraphType.CC),
254 flowsheet=self.flowsheet
255 )
256 graph_list.append(graph)
258 # Create nested curves
259 for segment_data in graph_data.get('segments', []):
260 curve = PinchCurve(
261 graph=graph,
262 title=segment_data.get('title'),
263 colour=segment_data.get('colour', pinchEnums.LineColour.Hot),
264 arrow=segment_data.get('arrow', pinchEnums.ArrowHead.NO_ARROW),
265 flowsheet=self.flowsheet
266 )
267 curve_list.append(curve)
269 for point in segment_data.get('data_points', []):
270 data_point = GraphDataPoint(
271 curve=curve,
272 x=point.get('x'),
273 y=point.get('y'),
274 flowsheet=self.flowsheet
275 )
276 data_point_list.append(data_point)
278 # Bulk create the objects
279 PinchGraphSet.objects.bulk_create(graph_set_list)
280 PinchGraph.objects.bulk_create(graph_list)
281 PinchCurve.objects.bulk_create(curve_list)
282 GraphDataPoint.objects.bulk_create(data_point_list)
284 except Exception as e:
285 raise RuntimeError("Serialization error occurred.") from e
287 def get_linear_streams(self, response_data):
288 return response_data['streams']
290 def run_get_t_h_data(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None, **_):
291 """
292 Get t_h data from streams
293 """
294 try:
295 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "generate_t_h_curve"
296 request_data = self.build_t_h_request(streams_io_props, mole_flow, ppKey, prev_states)
297 result = requests.post(url, json=request_data)
298 if result.status_code != 200: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 raise Exception(result.json())
300 response_data = result.json()
301 return response_data
302 except Exception as e:
303 print("Error during calculation:", e)
304 print("Traceback:", traceback.format_exc())
305 raise RuntimeError("Calculation error occurred.") from e