Coverage for backend/django/core/auxiliary/views/ExtractSegmentDataFromFS.py: 87%
132 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import traceback
2from typing import Union, Tuple
3from rest_framework.decorators import api_view
4from rest_framework.response import Response
5from drf_spectacular.utils import extend_schema
6from rest_framework import serializers, status
7from idaes_factory.unit_conversion.unit_conversion import convert_value
8from core.validation import api_view_validate
9from core.managers import get_flowsheet_access
11from PinchAnalysis.models.InputModels import Segment, StreamDataEntry
12from PinchAnalysis.models.HenNode import HenNode
13from PinchAnalysis.models.StreamDataProject import StreamDataProject
14from PinchAnalysis.views.henNodeHelpers import group_stream_by_unitop_type, set_hennode_connections
15from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
16from core.auxiliary.enums.pinchEnums import StreamType
17from core.auxiliary.enums.unitOpData import SimulationObjectClass
18from pinch_factory.pinch_factory import PinchFactory
19from django.db.models import Q
21DECIMAL_PLACES = 3
22VARIANCE = 0.01
25def get_compounds(stream, include_null: bool = False) -> set[tuple[str, float]]:
26 """
27 Returns a set of tuples containing the index (key) and value of all property
28 value objects in the stream's mole_frac_comp. Used for composition comparisons.
30 Returns:
31 - set[tuple[str, float]]: A set of (key, value) tuples.
32 """
33 mole_frac_comp = stream.properties.get_property("mole_frac_comp")
34 property_values = mole_frac_comp.values.all()
35 result = [
36 (prop.get_index("compound").key, prop.value)
37 for prop in property_values
38 if (prop.value not in [None, ""] or include_null)
39 ]
40 return result
43def create_he_streams(sim_obj, group) -> None:
44 streamDataProject = group.flowsheet.StreamDataProject
45 stream_ls: list[StreamDataEntry] = []
47 for key in sim_obj.schema.propertyPackagePorts.keys():
48 if key != "__none__": 48 ↛ 47line 48 didn't jump to line 47 because the condition on line 48 was always true
49 stream_ls.append(
50 StreamDataEntry(
51 flowsheet=group.flowsheet,
52 streamDataProject=streamDataProject,
53 unitop=sim_obj,
54 group=group,
55 property_package_mapping=key,
56 )
57 )
58 StreamDataEntry.objects.bulk_create(stream_ls)
61def compare_compositions(stream_1: SimulationObject, stream_2: SimulationObject) -> bool:
62 """
63 Compares the composition of two streams
64 Returns: boolean indicating equality - True if compositions are equal
65 """
66 stream_1_set = get_compounds(stream_1)
67 stream_2_set = get_compounds(stream_2)
68 return stream_1_set == stream_2_set
70# This needs to be revised and maybe moved elsewhere. The dT is WRONG. also it needs to be ln(dT).
71# specifically, we need dT of cold segment, and dT of hot segment (pairs from exchanger(?), and we dont have that ehre.)
72def _calc_area(htc: float, q_kw: float, t_supply_c: float, t_target_c: float) -> float:
73 U = float(htc or 0)
74 Q = float(q_kw or 0)
75 dT = abs(float(t_supply_c or 0) - float(t_target_c or 0))
76 if U <= 0 or dT <= 0: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 return 0.0
78 return Q / (U * dT)
81def _get_io_stream_properties(streamDataEntry: StreamDataEntry, prop_arg: str, tar_unit: str) -> Tuple[float,float]:
82 i, o = getattr(streamDataEntry, prop_arg)
83 i_val = i.get_value()
84 o_val = o.get_value()
85 if i_val is None or o_val is None: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 return None, None
87 else:
88 supply = convert_value(i_val, i.unit, tar_unit)
89 target = convert_value(o_val, o.unit, tar_unit)
90 return supply, target
93def _get_stream_type(streamDataEntry: StreamDataEntry):
94 if streamDataEntry.unitop.objectType == SimulationObjectClass.HeatExchanger:
95 if streamDataEntry.property_package_mapping == "Cold Side":
96 stream_type = StreamType.Cold.value
97 else:
98 stream_type = StreamType.Hot.value
99 else:
100 if streamDataEntry.unitop.objectType == SimulationObjectClass.Heater:
101 stream_type = StreamType.Cold.value
102 else:
103 stream_type = StreamType.Hot.value
104 return stream_type
107def _get_terminal_states(streamDataEntry: StreamDataEntry) -> dict:
108 inlet_stream, _ = streamDataEntry.inlet_outlet_stream
109 if not inlet_stream: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 return None
112 mole_flow = convert_value(
113 inlet_stream.properties.get_property("flow_mol").get_value(),
114 inlet_stream.properties.get_property("flow_mol").unit,
115 "mol/s",
116 )
117 service_T_supply, service_T_target = _get_io_stream_properties(streamDataEntry, "temperatures", "degK")
118 service_P_supply, service_P_target = _get_io_stream_properties(streamDataEntry, "pressures", "Pa")
119 service_H_supply, service_H_target = _get_io_stream_properties(streamDataEntry, "enthalpies", "J/mol")
121 if service_T_supply == None or service_P_supply == None or service_H_target == None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 return None
123 else:
124 return {
125 "streams_io_props": [{
126 "t_supply": service_T_supply,
127 "t_target": service_T_target,
128 "p_supply": service_P_supply,
129 "p_target": service_P_target,
130 "h_supply": service_H_supply,
131 "h_target": service_H_target,
132 "composition": get_compounds(inlet_stream),
133 }],
134 "ppKey": streamDataEntry.unitop.get_property_package(),
135 "mole_flow": mole_flow,
136 "comp_name": inlet_stream.componentName,
137 "stream_type": _get_stream_type(streamDataEntry),
138 "streamDataEntry": streamDataEntry,
139 }
142def _stream_segment_creator(comp_name: str, points: list, stream_type: str, streamDataEntry: StreamDataEntry, **_):
143 # Create segments after linearization
144 segments = []
145 for index in range(len(points) - 1):
146 t_supply = convert_value(points[index][1], "degK", "degC")
147 t_target = check_target_temperature_validity(
148 t_supply,
149 convert_value(points[index + 1][1], "degK", "degC"),
150 stream_type
151 )
152 heat_flow = abs(
153 convert_value(points[index + 1][0] - points[index][0], "W", "kW")
154 )
155 htc=1
156 segments.append(
157 Segment(
158 stream_data_entry=streamDataEntry,
159 name=f"{comp_name} ({index + 1})" if len(points) > 1 else comp_name,
160 t_supply=t_supply,
161 t_target=t_target,
162 heat_flow=heat_flow,
163 htc=htc,
164 area=_calc_area(htc,heat_flow, t_supply, t_target),
165 flowsheet=streamDataEntry.flowsheet,
166 )
167 )
168 return segments
171@api_view_validate
172@api_view(['POST'])
173def extract_stream_data(request) -> Response:
174 try:
175 flowsheet_id = request.GET.get("flowsheet")
176 access_state = get_flowsheet_access(request.user, flowsheet_id)
177 if access_state.has_read_access and not access_state.has_write_access:
178 return Response(
179 {"error": "This flowsheet is shared with read-only access."},
180 status=status.HTTP_403_FORBIDDEN,
181 )
182 factory = PinchFactory(flowsheet_id)
184 streamDataEntries = StreamDataEntry.objects.filter(flowsheet_id=flowsheet_id, custom=False)
186 segments = []
187 # i'll just delete hennodes and create new ones for now
188 HenNode.objects.filter(flowsheet_id=flowsheet_id).delete()
190 for streamDataEntry in streamDataEntries:
191 streamDataEntry.Segments.all().delete()
192 terminal_data = _get_terminal_states(streamDataEntry)
193 if terminal_data is None: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true
194 continue
195 terminal_data["prev_states"] = streamDataEntry.states if hasattr(streamDataEntry, "states") else None
196 t_h_data = factory.run_get_t_h_data(**terminal_data)
198 streamDataEntry.t_h_data = t_h_data["curve_points"]
199 streamDataEntry.states = t_h_data["states"]
200 streamDataEntry.save()
202 linearised_points = factory.run_linearize(t_h_data["curve_points"], **terminal_data)
203 new_segments = _stream_segment_creator(points=linearised_points, **terminal_data)
204 segments.extend(new_segments)
206 Segment.objects.bulk_create(segments)
208 # create hennodes
209 created_segments = list(Segment.objects.filter(id__in=[s.id for s in segments]))
210 sdes = {seg.stream_data_entry for seg in created_segments if seg.stream_data_entry}
212 processed_stream_ids = set(HenNode.objects.filter(
213 Q(stream_data_entry__in=sdes) |
214 Q(hot_connection__in=sdes) |
215 Q(cold_connection__in=sdes)
216 ).values_list('stream_data_entry_id', flat=True))
218 hennodes_to_create = []
220 # group streams by unitop type
221 grouped_by_unitop = group_stream_by_unitop_type(sdes, processed_stream_ids, hennodes_to_create)
223 # set connections for the hennodes
224 set_hennode_connections(grouped_by_unitop, processed_stream_ids, hennodes_to_create)
226 HenNode.objects.bulk_create(hennodes_to_create)
228 # link segments to hennodes
229 for segment in created_segments:
230 sde = segment.stream_data_entry
231 if sde: 231 ↛ 229line 231 didn't jump to line 229 because the condition on line 231 was always true
232 hn = HenNode.objects.filter(
233 Q(stream_data_entry=sde) |
234 Q(hot_connection=sde) |
235 Q(cold_connection=sde)
236 ).first()
237 if hn: 237 ↛ 229line 237 didn't jump to line 229 because the condition on line 237 was always true
238 segment.hen_node = hn
239 segment.save(update_fields=['hen_node'])
241 return Response(status=200)
243 except Exception as e:
244 tb_info = traceback.format_exc()
245 print(tb_info)
246 error_message = str(e)
247 response_data = {
248 "status": "error",
249 "message": error_message,
250 "traceback": tb_info
251 }
252 return Response(response_data, status=500)
254def check_target_temperature_validity(t_supply: float, t_target: float, stream_type: StreamType, min_delta_t: float = 0.0001) -> tuple[float, float]:
255 if abs(t_supply - t_target) < min_delta_t: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 t_target = t_supply - min_delta_t if stream_type == StreamType.Hot.value else t_supply + min_delta_t
257 return t_target