Coverage for backend/core/auxiliary/views/ExtractSegmentDataFromFS.py: 31%
102 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import traceback
2from typing import Union
3from rest_framework.decorators import api_view
4from rest_framework.response import Response
5from drf_spectacular.utils import extend_schema
6from rest_framework import serializers
7from idaes_factory.unit_conversion.unit_conversion import convert_value
8from core.validation import api_view_validate
10from PinchAnalysis.models.InputModels import Segment, StreamDataEntry
11from PinchAnalysis.models.StreamDataProject import StreamDataProject
12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
13from core.auxiliary.enums.pinchEnums import StreamType
14from core.auxiliary.enums.unitOpData import SimulationObjectClass
15from pinch_factory.pinch_factory import PinchFactory
17DECIMAL_PLACES = 3
18VARIANCE = 0.01
21def get_compounds(stream, include_null: bool = False) -> set[tuple[str, float]]:
22 """
23 Returns a set of tuples containing the index (key) and value of all property
24 value objects in the stream's mole_frac_comp. Used for composition comparisons.
26 Returns:
27 - set[tuple[str, float]]: A set of (key, value) tuples.
28 """
29 mole_frac_comp = stream.properties.get_property("mole_frac_comp")
30 property_values = mole_frac_comp.values.all()
31 result = [
32 (prop.get_index("compound").key, prop.value)
33 for prop in property_values
34 if (prop.value not in [None, ""] or include_null)
35 ]
36 return result
39def create_he_streams(sim_obj, group) -> None:
40 streamDataProject = StreamDataProject.objects.first()
41 stream_ls: list[StreamDataEntry] = []
43 for key in sim_obj.schema.propertyPackagePorts.keys():
44 if key != "__none__": 44 ↛ 43line 44 didn't jump to line 43 because the condition on line 44 was always true
45 stream_ls.append(
46 StreamDataEntry(
47 flowsheet=group.flowsheet,
48 streamDataProject=streamDataProject,
49 unitop=sim_obj,
50 group=group,
51 property_package_mapping=key,
52 )
53 )
54 StreamDataEntry.objects.bulk_create(stream_ls)
57def compare_compositions(stream_1: SimulationObject, stream_2: SimulationObject) -> bool:
58 """
59 Compares the composition of two streams
60 Returns: boolean indicating equality - True if compositions are equal
61 """
62 stream_1_set = get_compounds(stream_1)
63 stream_2_set = get_compounds(stream_2)
64 return stream_1_set == stream_2_set
69@api_view_validate
70@api_view(['POST'])
71def extract_stream_data(request) -> Response:
72 try:
73 flowsheet_id = request.GET.get("flowsheet")
74 factory = PinchFactory(flowsheet_id)
76 streamDataEntries = StreamDataEntry.objects.filter(custom=False)
78 segments = []
80 for streamDataEntry in streamDataEntries:
81 # im just going to delete the segments and create new ones for now
82 streamDataEntry.Segments.all().delete()
84 inlet_stream, outlet_stream = streamDataEntry.inlet_outlet_stream
85 if not inlet_stream:
86 continue
87 inlet_properties = inlet_stream.properties
88 mole_flow = (inlet_properties.get_property(
89 "flow_mol").get_value(), inlet_properties.get_property("flow_mol").unit)
90 sevice_moleFlow = convert_value(
91 mole_flow[0], mole_flow[1], 'mol/s')
93 # SI Units
94 inlet_temp, outlet_temp = streamDataEntry.temperatures
96 inlet_temp_val = inlet_temp.get_value()
97 outlet_temp_val = outlet_temp.get_value()
98 if inlet_temp_val is None or outlet_temp_val is None:
99 continue
100 service_T_supply = convert_value(
101 inlet_temp_val, inlet_temp.unit, 'degK')
102 service_T_target = convert_value(
103 outlet_temp_val, outlet_temp.unit, 'degK')
105 inlet_pressure, outlet_pressure = streamDataEntry.pressures
106 inlet_pressure_val = inlet_pressure.get_value()
107 outlet_pressure_val = outlet_pressure.get_value()
108 if inlet_pressure_val is None or outlet_pressure_val is None:
109 continue
110 service_P_supply = convert_value(
111 inlet_pressure_val, inlet_pressure.unit, 'Pa')
112 service_P_target = convert_value(
113 outlet_pressure_val, outlet_pressure.unit, 'Pa')
115 inlet_enthalpy, outlet_enthalpy = streamDataEntry.enthalpies
116 inlet_enthalpy_val = inlet_enthalpy.get_value()
117 outlet_enthalpy_val = outlet_enthalpy.get_value()
118 if inlet_enthalpy_val is None or outlet_enthalpy_val is None:
119 continue
120 service_H_supply = convert_value(
121 inlet_enthalpy_val, inlet_enthalpy.unit, 'J/mol')
122 service_H_target = convert_value(
123 outlet_enthalpy_val, outlet_enthalpy.unit, 'J/mol')
125 data = [{
126 't_supply': service_T_supply,
127 't_target': service_T_target,
128 'p_supply': service_P_supply,
129 'p_target': service_P_target,
130 'h_supply': service_H_supply,
131 'h_target': service_H_target,
132 'composition': get_compounds(inlet_stream),
133 }]
134 mole_flow = sevice_moleFlow
135 ppKey = streamDataEntry.unitop.get_property_package(
136 streamDataEntry.property_package_mapping).propertyPackage
138 t_h_data = factory.run_get_t_h_data(data, mole_flow, ppKey)
140 curve_points = t_h_data['curve_points']
141 states = t_h_data['states']
143 streamDataEntry.t_h_data = t_h_data
144 streamDataEntry.states = states
145 streamDataEntry.save()
147 points = factory.run_linearize(
148 curve_points,
149 data,
150 mole_flow,
151 ppKey
152 )
154 # create segments after linearization
155 if streamDataEntry.unitop.objectType == SimulationObjectClass.HeatExchanger:
156 if streamDataEntry.property_package_mapping == "Cold Side":
157 stream_type = StreamType.Cold.value
158 else:
159 stream_type = StreamType.Hot.value
160 else:
161 stream_type = StreamType.Cold.value if streamDataEntry.unitop.objectType == SimulationObjectClass.Heater else StreamType.Hot.value
163 # if no new points are created, skip
164 if len(points) == 1:
165 heat_flow = streamDataEntry.heat_flow.get_value()
166 segments.append(Segment(
167 stream_data_entry=streamDataEntry,
168 name=f"{inlet_stream.componentName}",
169 t_supply=inlet_temp.get_value(),
170 t_target=outlet_temp.get_value(),
171 heat_flow=heat_flow,
172 flowsheet=streamDataEntry.flowsheet
173 ))
174 else:
175 # Go through points to generate segments
176 for index in range(len(points) - 1):
177 t_supply = convert_value(
178 points[index][1], 'degK', 'degC')
179 segments.append(Segment(
180 stream_data_entry=streamDataEntry,
181 name=f"{inlet_stream.componentName} ({index + 1})",
182 t_supply=t_supply,
183 t_target=check_target_temperature_validity(t_supply, convert_value(
184 points[index + 1][1], 'degK', 'degC'), stream_type),
185 heat_flow=abs(convert_value(
186 points[index + 1][0] - points[index][0], 'W', 'kW')),
187 flowsheet=streamDataEntry.flowsheet
188 ))
190 Segment.objects.bulk_create(segments)
192 return Response(status=200)
194 except Exception as e:
195 tb_info = traceback.format_exc()
196 print(tb_info)
197 error_message = str(e)
198 response_data = {'status': 'error',
199 'message': error_message, 'traceback': tb_info}
200 return Response(response_data, status=500)
202def check_target_temperature_validity(t_supply: float, t_target: float, stream_type: StreamType, min_delta_t: float = 0.01) -> tuple[float, float]:
203 if abs(t_supply - t_target) < min_delta_t:
204 t_target = t_supply - \
205 min_delta_t if stream_type == StreamType.Hot.value else t_supply + min_delta_t
206 return t_target