Coverage for backend/django/core/auxiliary/views/ExtractSegmentDataFromFS.py: 87%
128 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1import traceback
2from typing import Union, Tuple
3from rest_framework.decorators import api_view
4from rest_framework.response import Response
5from drf_spectacular.utils import extend_schema
6from rest_framework import serializers
7from idaes_factory.unit_conversion.unit_conversion import convert_value
8from core.validation import api_view_validate
10from PinchAnalysis.models.InputModels import Segment, StreamDataEntry
11from PinchAnalysis.models.HenNode import HenNode
12from PinchAnalysis.models.StreamDataProject import StreamDataProject
13from PinchAnalysis.views.henNodeHelpers import group_stream_by_unitop_type, set_hennode_connections
14from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
15from core.auxiliary.enums.pinchEnums import StreamType
16from core.auxiliary.enums.unitOpData import SimulationObjectClass
17from pinch_factory.pinch_factory import PinchFactory
18from django.db.models import Q
20DECIMAL_PLACES = 3
21VARIANCE = 0.01
24def get_compounds(stream, include_null: bool = False) -> set[tuple[str, float]]:
25 """
26 Returns a set of tuples containing the index (key) and value of all property
27 value objects in the stream's mole_frac_comp. Used for composition comparisons.
29 Returns:
30 - set[tuple[str, float]]: A set of (key, value) tuples.
31 """
32 mole_frac_comp = stream.properties.get_property("mole_frac_comp")
33 property_values = mole_frac_comp.values.all()
34 result = [
35 (prop.get_index("compound").key, prop.value)
36 for prop in property_values
37 if (prop.value not in [None, ""] or include_null)
38 ]
39 return result
42def create_he_streams(sim_obj, group) -> None:
43 streamDataProject = StreamDataProject.objects.first()
44 stream_ls: list[StreamDataEntry] = []
46 for key in sim_obj.schema.propertyPackagePorts.keys():
47 if key != "__none__": 47 ↛ 46line 47 didn't jump to line 46 because the condition on line 47 was always true
48 stream_ls.append(
49 StreamDataEntry(
50 flowsheet=group.flowsheet,
51 streamDataProject=streamDataProject,
52 unitop=sim_obj,
53 group=group,
54 property_package_mapping=key,
55 )
56 )
57 StreamDataEntry.objects.bulk_create(stream_ls)
60def compare_compositions(stream_1: SimulationObject, stream_2: SimulationObject) -> bool:
61 """
62 Compares the composition of two streams
63 Returns: boolean indicating equality - True if compositions are equal
64 """
65 stream_1_set = get_compounds(stream_1)
66 stream_2_set = get_compounds(stream_2)
67 return stream_1_set == stream_2_set
69# This needs to be revised and maybe moved elsewhere. The dT is WRONG. also it needs to be ln(dT).
70# specifically, we need dT of cold segment, and dT of hot segment (pairs from exchanger(?), and we dont have that ehre.)
71def _calc_area(htc: float, q_kw: float, t_supply_c: float, t_target_c: float) -> float:
72 U = float(htc or 0)
73 Q = float(q_kw or 0)
74 dT = abs(float(t_supply_c or 0) - float(t_target_c or 0))
75 if U <= 0 or dT <= 0: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 return 0.0
77 return Q / (U * dT)
80def _get_io_stream_properties(streamDataEntry: StreamDataEntry, prop_arg: str, tar_unit: str) -> Tuple[float,float]:
81 i, o = getattr(streamDataEntry, prop_arg)
82 i_val = i.get_value()
83 o_val = o.get_value()
84 if i_val is None or o_val is None: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 return None, None
86 else:
87 supply = convert_value(i_val, i.unit, tar_unit)
88 target = convert_value(o_val, o.unit, tar_unit)
89 return supply, target
92def _get_stream_type(streamDataEntry: StreamDataEntry):
93 if streamDataEntry.unitop.objectType == SimulationObjectClass.HeatExchanger:
94 if streamDataEntry.property_package_mapping == "Cold Side":
95 stream_type = StreamType.Cold.value
96 else:
97 stream_type = StreamType.Hot.value
98 else:
99 if streamDataEntry.unitop.objectType == SimulationObjectClass.Heater:
100 stream_type = StreamType.Cold.value
101 else:
102 stream_type = StreamType.Hot.value
103 return stream_type
106def _get_terminal_states(streamDataEntry: StreamDataEntry) -> dict:
107 inlet_stream, _ = streamDataEntry.inlet_outlet_stream
108 if not inlet_stream: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 return None
111 mole_flow = convert_value(
112 inlet_stream.properties.get_property("flow_mol").get_value(),
113 inlet_stream.properties.get_property("flow_mol").unit,
114 "mol/s",
115 )
116 service_T_supply, service_T_target = _get_io_stream_properties(streamDataEntry, "temperatures", "degK")
117 service_P_supply, service_P_target = _get_io_stream_properties(streamDataEntry, "pressures", "Pa")
118 service_H_supply, service_H_target = _get_io_stream_properties(streamDataEntry, "enthalpies", "J/mol")
120 if service_T_supply == None or service_P_supply == None or service_H_target == None: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 return None
122 else:
123 return {
124 "streams_io_props": [{
125 "t_supply": service_T_supply,
126 "t_target": service_T_target,
127 "p_supply": service_P_supply,
128 "p_target": service_P_target,
129 "h_supply": service_H_supply,
130 "h_target": service_H_target,
131 "composition": get_compounds(inlet_stream),
132 }],
133 "ppKey": streamDataEntry.unitop.get_property_package(streamDataEntry.property_package_mapping).propertyPackage,
134 "mole_flow": mole_flow,
135 "comp_name": inlet_stream.componentName,
136 "stream_type": _get_stream_type(streamDataEntry),
137 "streamDataEntry": streamDataEntry,
138 }
141def _stream_segment_creator(comp_name: str, points: list, stream_type: str, streamDataEntry: StreamDataEntry, **_):
142 # Create segments after linearization
143 segments = []
144 for index in range(len(points) - 1):
145 t_supply = convert_value(points[index][1], "degK", "degC")
146 t_target = check_target_temperature_validity(
147 t_supply,
148 convert_value(points[index + 1][1], "degK", "degC"),
149 stream_type
150 )
151 heat_flow = abs(
152 convert_value(points[index + 1][0] - points[index][0], "W", "kW")
153 )
154 htc=1
155 segments.append(
156 Segment(
157 stream_data_entry=streamDataEntry,
158 name=f"{comp_name} ({index + 1})" if len(points) > 1 else comp_name,
159 t_supply=t_supply,
160 t_target=t_target,
161 heat_flow=heat_flow,
162 htc=htc,
163 area=_calc_area(htc,heat_flow, t_supply, t_target),
164 flowsheet=streamDataEntry.flowsheet,
165 )
166 )
167 return segments
170@api_view_validate
171@api_view(['POST'])
172def extract_stream_data(request) -> Response:
173 try:
174 flowsheet_id = request.GET.get("flowsheet")
175 factory = PinchFactory(flowsheet_id)
177 streamDataEntries = StreamDataEntry.objects.filter(custom=False)
179 segments = []
180 # i'll just delete hennodes and create new ones for now
181 HenNode.objects.filter(flowsheet_id=flowsheet_id).delete()
183 for streamDataEntry in streamDataEntries:
184 streamDataEntry.Segments.all().delete()
185 terminal_data = _get_terminal_states(streamDataEntry)
186 if terminal_data is None: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 continue
188 terminal_data["prev_states"] = streamDataEntry.states if hasattr(streamDataEntry, "states") else None
189 t_h_data = factory.run_get_t_h_data(**terminal_data)
191 streamDataEntry.t_h_data = t_h_data["curve_points"]
192 streamDataEntry.states = t_h_data["states"]
193 streamDataEntry.save()
195 linearised_points = factory.run_linearize(t_h_data["curve_points"], **terminal_data)
196 new_segments = _stream_segment_creator(points=linearised_points, **terminal_data)
197 segments.extend(new_segments)
199 Segment.objects.bulk_create(segments)
201 # create hennodes
202 created_segments = list(Segment.objects.filter(id__in=[s.id for s in segments]))
203 sdes = {seg.stream_data_entry for seg in created_segments if seg.stream_data_entry}
205 processed_stream_ids = set(HenNode.objects.filter(
206 Q(stream_data_entry__in=sdes) |
207 Q(hot_connection__in=sdes) |
208 Q(cold_connection__in=sdes)
209 ).values_list('stream_data_entry_id', flat=True))
211 hennodes_to_create = []
213 # group streams by unitop type
214 grouped_by_unitop = group_stream_by_unitop_type(sdes, processed_stream_ids, hennodes_to_create)
216 # set connections for the hennodes
217 set_hennode_connections(grouped_by_unitop, processed_stream_ids, hennodes_to_create)
219 HenNode.objects.bulk_create(hennodes_to_create)
221 # link segments to hennodes
222 for segment in created_segments:
223 sde = segment.stream_data_entry
224 if sde: 224 ↛ 222line 224 didn't jump to line 222 because the condition on line 224 was always true
225 hn = HenNode.objects.filter(
226 Q(stream_data_entry=sde) |
227 Q(hot_connection=sde) |
228 Q(cold_connection=sde)
229 ).first()
230 if hn: 230 ↛ 222line 230 didn't jump to line 222 because the condition on line 230 was always true
231 segment.hen_node = hn
232 segment.save(update_fields=['hen_node'])
234 return Response(status=200)
236 except Exception as e:
237 tb_info = traceback.format_exc()
238 print(tb_info)
239 error_message = str(e)
240 response_data = {
241 "status": "error",
242 "message": error_message,
243 "traceback": tb_info
244 }
245 return Response(response_data, status=500)
247def check_target_temperature_validity(t_supply: float, t_target: float, stream_type: StreamType, min_delta_t: float = 0.0001) -> tuple[float, float]:
248 if abs(t_supply - t_target) < min_delta_t: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 t_target = t_supply - min_delta_t if stream_type == StreamType.Hot.value else t_supply + min_delta_t
250 return t_target