Coverage for backend/django/pinch_factory/pinch_factory.py: 81%
158 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1import json
2import os
3import traceback
4from typing import Any, Dict
5import requests
7from PinchAnalysis.models.OutputModels import GraphDataPoint, HeatReceiverUtilitySummary, HeatSupplierUtilitySummary, PinchCurve, PinchGraph, PinchGraphSet, PinchTemp, TargetSummary
8from PinchAnalysis.models.StreamDataProject import StreamDataProject
9from PinchAnalysis.models.InputModels import PinchUtility, Segment
10from core.auxiliary.enums.generalEnums import AbstractionType
11from core.auxiliary.enums import pinchEnums
12from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer
13from core.auxiliary.models.Flowsheet import Flowsheet
15class PinchFactory:
16 def __init__(self, flowsheet_id: int, num_intervals: int = 20, t_min: float = 1) -> None:
17 self.project = Flowsheet.objects.get(pk=flowsheet_id).StreamDataProject
18 self.flowsheet = self.project.flowsheet
20 # Sampling intervals for stream linearisation
21 self.num_intervals = num_intervals
22 # Maximum temperature difference between actual stream samples and linearisation curve
23 self.t_min = t_min
25 def build_calculate_request(self, excluded_segments: list[int]) -> dict:
26 """
27 Extract and prepare inputs from the project data.
28 """
29 request_data = {}
30 request_data['streams'] = []
31 request_data['utilities'] = []
32 for segment in Segment.objects.all():
33 if segment.id in excluded_segments:
34 continue
35 request_data['streams'].append({
36 "zone": segment.zone,
37 "name": segment.name,
38 "t_supply": segment.t_supply,
39 "t_target": segment.t_target,
40 "heat_flow": segment.heat_flow,
41 "dt_cont": segment.dt_cont,
42 "htc": segment.htc,
43 })
44 for utility in PinchUtility.objects.all(): 44 ↛ 45line 44 didn't jump to line 45 because the loop on line 44 never started
45 request_data['utilities'].append({
46 "name": utility.name,
47 "type": utility.type,
48 "t_supply": utility.t_supply,
49 "t_target": utility.t_target,
50 "heat_flow": utility.heat_flow,
51 "dt_cont": utility.dt_cont,
52 "htc": utility.htc,
53 "price": utility.price,
54 })
56 request_data['options'] = {
57 'main': [prop.key for prop in self.project.Options.selections.containedProperties.all() if prop.get_value() is True],
58 'turbine': [{"key": prop.key, "value": prop.get_value()} for prop in self.project.Options.turbine_options.properties.containedProperties.all()]
59 }
60 request_data['zone_tree'] = self.build_zone_structure()
62 return request_data
64 def build_zone_structure(self) -> list:
65 # Step 1: Collect all groups related to the StreamDataEntries
66 all_groups = {}
67 for stream_data_entry in self.project.StreamDataEntries.all():
68 group = stream_data_entry.group
69 zone = stream_data_entry.zone
70 all_groups[zone] = {
71 "children": [],
72 "group": group,
73 }
74 # Step 2: Organize into tree
75 root_node = None
76 parent_zones = {}
77 for zone_data in all_groups.values():
78 group = zone_data["group"]
79 parent_group = group.get_parent_group()
80 if parent_group: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 parent_zone = parent_group.simulationObject.componentName
82 if parent_zone in all_groups:
83 all_groups[parent_zone]["children"].append(zone_data)
84 else:
85 if parent_zone not in parent_zones:
86 parent_zones[parent_zone] = {
87 "children": [],
88 "group": parent_group,
89 }
90 parent_zones[parent_zone]["children"].append(zone_data)
91 for parent_zone_name, parent_zone_data in parent_zones.items(): 91 ↛ 92line 91 didn't jump to line 92 because the loop on line 91 never started
92 all_groups[parent_zone_name] = parent_zone_data
93 for zone_data in all_groups.values():
94 group = zone_data["group"]
95 parent_group = group.get_parent_group()
96 if not parent_group: 96 ↛ 93line 96 didn't jump to line 93 because the condition on line 96 was always true
97 root_node = zone_data
99 # Step 3: Format it nicely
100 def clean_node(node):
101 return {
102 "name": node["group"].simulationObject.componentName,
103 "type": node["group"].abstractionType,
104 "children": [clean_node(child) for child in node["children"]],
105 }
107 if root_node is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 return None
109 return clean_node(root_node)
111 def build_linearize_request(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str) -> dict:
112 return (
113 {
114 't_h_data': t_h_data,
115 't_min': self.t_min,
116 'num_intervals': self.num_intervals,
117 'streams': streams_io_props,
118 'mole_flow': mole_flow,
119 'ppKey': ppKey
120 }
121 )
123 def build_t_h_request(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None) -> dict:
124 return (
125 {
126 't_min': self.t_min,
127 'num_intervals': self.num_intervals,
128 'streams': streams_io_props,
129 'mole_flow': mole_flow,
130 'ppKey': ppKey,
131 'prev_states': prev_states,
132 }
133 )
135 def clear_outputs(self) -> None:
136 """
137 Removes the previous outputs from the project
138 """
139 output_owner = self.project.Outputs
140 output_owner.targets.all().delete()
141 output_owner.graph_sets.all().delete()
143 def run_calculate(self, excluded_segments: list[int]) -> Dict[str, Any]:
144 """
145 Format data and send request to pinch service
146 """
147 try:
148 request_data = self.build_calculate_request(excluded_segments)
149 # print(request_data)
150 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "calculate"
151 result = requests.post(url, json=request_data)
152 if result.status_code != 200: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 raise Exception(result.json())
154 self.clear_outputs()
155 response_data = result.json()
156 self.serialize_return_data(response_data)
158 except Exception as e:
159 print("Error during calculation:", e)
160 print("Traceback:", traceback.format_exc())
161 raise RuntimeError("Calculation error occurred.") from e
163 def run_linearize(self, t_h_data, streams_io_props: list[dict], mole_flow: float, ppKey: str, **_):
164 """
165 Linearizes a stream curve
166 """
167 try:
168 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "linearize"
169 request_data = self.build_linearize_request(t_h_data, streams_io_props, mole_flow, ppKey)
170 # print(request_data)
171 result = requests.post(url, json=request_data)
172 if result.status_code != 200: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 raise Exception(result.json())
174 response_data = result.json()
175 return self.get_linear_streams(response_data)
176 except Exception as e:
177 print("Error during calculation:", e)
178 print("Traceback:", traceback.format_exc())
179 raise RuntimeError("Calculation error occurred.") from e
181 def serialize_return_data(self, response_data):
182 """
183 Converts output data to db entries, including nested objects.
184 """
185 try:
186 output_owner = self.project.Outputs
187 targets = response_data.get('targets', None)
188 graphs = response_data.get('graphs', None)
190 # Target Objects
191 if targets: 191 ↛ 237line 191 didn't jump to line 237 because the condition on line 191 was always true
192 target_list = []
193 heat_suppliers = []
194 heat_receivers = []
195 for entry in targets:
196 # Pop data that should not be included in the target creation
197 temp_pinch_data = entry.pop('temp_pinch', None)
198 hot_utilities = entry.pop('hot_utilities', [])
199 cold_utilities = entry.pop('cold_utilities', [])
201 # print('temp_pinch_data', temp_pinch_data)
203 temp_pinch = PinchTemp.objects.create(**temp_pinch_data, flowsheet=self.flowsheet) if temp_pinch_data else None
205 # Create TargetSummary
206 target = TargetSummary(
207 output_owner=output_owner,
208 temp_pinch=temp_pinch,
209 **entry,
210 flowsheet=self.flowsheet
211 )
212 # print(target)
213 target.save()
215 for supplier_utility in hot_utilities:
216 heat_suppliers.append(HeatSupplierUtilitySummary(
217 summary_owner=target,
218 **supplier_utility,
219 flowsheet=self.flowsheet
220 ))
222 for receiver_utility in cold_utilities:
223 heat_receivers.append(HeatReceiverUtilitySummary(
224 summary_owner=target,
225 **receiver_utility,
226 flowsheet=self.flowsheet
227 ))
229 target_list.append(target)
231 # Bulk create objects
232 TargetSummary.objects.bulk_create(target_list, ignore_conflicts=True)
233 HeatSupplierUtilitySummary.objects.bulk_create(heat_suppliers)
234 HeatReceiverUtilitySummary.objects.bulk_create(heat_receivers)
236 # Graphs
237 if graphs: 237 ↛ exitline 237 didn't return from function 'serialize_return_data' because the condition on line 237 was always true
238 graph_set_list = []
239 graph_list = []
240 curve_list = []
241 data_point_list = []
243 for key, graph_set_data in graphs.items():
244 graph_set = PinchGraphSet(output_owner=output_owner, name=graph_set_data.get('name'), flowsheet=self.flowsheet)
245 graph_set_list.append(graph_set)
247 # Create nested graphs
248 for graph_data in graph_set_data.get('graphs', []):
249 graph = PinchGraph(
250 graph_set=graph_set,
251 name=graph_data.get('name'),
252 type=graph_data.get('type', pinchEnums.GraphType.CC),
253 flowsheet=self.flowsheet
254 )
255 graph_list.append(graph)
257 # Create nested curves
258 for segment_data in graph_data.get('segments', []):
259 curve = PinchCurve(
260 graph=graph,
261 title=segment_data.get('title'),
262 colour=segment_data.get('colour', pinchEnums.LineColour.Hot),
263 arrow=segment_data.get('arrow', pinchEnums.ArrowHead.NO_ARROW),
264 flowsheet=self.flowsheet
265 )
266 curve_list.append(curve)
268 for point in segment_data.get('data_points', []):
269 data_point = GraphDataPoint(
270 curve=curve,
271 x=point.get('x'),
272 y=point.get('y'),
273 flowsheet=self.flowsheet
274 )
275 data_point_list.append(data_point)
277 # Bulk create the objects
278 PinchGraphSet.objects.bulk_create(graph_set_list)
279 PinchGraph.objects.bulk_create(graph_list)
280 PinchCurve.objects.bulk_create(curve_list)
281 GraphDataPoint.objects.bulk_create(data_point_list)
283 except Exception as e:
284 raise RuntimeError("Serialization error occurred.") from e
286 def get_linear_streams(self, response_data):
287 return response_data['streams']
289 def run_get_t_h_data(self, streams_io_props: list[dict], mole_flow: float, ppKey: str, prev_states = None, **_):
290 """
291 Get t_h data from streams
292 """
293 try:
294 url = (os.getenv('PINCH_SERVICE_URL') or "http://localhost:8082") + "/" + "generate_t_h_curve"
295 request_data = self.build_t_h_request(streams_io_props, mole_flow, ppKey, prev_states)
296 result = requests.post(url, json=request_data)
297 if result.status_code != 200: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 raise Exception(result.json())
299 response_data = result.json()
300 return response_data
301 except Exception as e:
302 print("Error during calculation:", e)
303 print("Traceback:", traceback.format_exc())
304 raise RuntimeError("Calculation error occurred.") from e