Coverage for backend/core/auxiliary/views/ExtractSegmentDataFromFS.py: 31%

102 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import traceback 

2from typing import Union 

3from rest_framework.decorators import api_view 

4from rest_framework.response import Response 

5from drf_spectacular.utils import extend_schema 

6from rest_framework import serializers 

7from idaes_factory.unit_conversion.unit_conversion import convert_value 

8from core.validation import api_view_validate 

9 

10from PinchAnalysis.models.InputModels import Segment, StreamDataEntry 

11from PinchAnalysis.models.StreamDataProject import StreamDataProject 

12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

13from core.auxiliary.enums.pinchEnums import StreamType 

14from core.auxiliary.enums.unitOpData import SimulationObjectClass 

15from pinch_factory.pinch_factory import PinchFactory 

16 

17DECIMAL_PLACES = 3 

18VARIANCE = 0.01 

19 

20 

21def get_compounds(stream, include_null: bool = False) -> set[tuple[str, float]]: 

22 """ 

23 Returns a set of tuples containing the index (key) and value of all property 

24 value objects in the stream's mole_frac_comp. Used for composition comparisons. 

25 

26 Returns: 

27 - set[tuple[str, float]]: A set of (key, value) tuples. 

28 """ 

29 mole_frac_comp = stream.properties.get_property("mole_frac_comp") 

30 property_values = mole_frac_comp.values.all() 

31 result = [ 

32 (prop.get_index("compound").key, prop.value) 

33 for prop in property_values 

34 if (prop.value not in [None, ""] or include_null) 

35 ] 

36 return result 

37 

38 

39def create_he_streams(sim_obj, group) -> None: 

40 streamDataProject = StreamDataProject.objects.first() 

41 stream_ls: list[StreamDataEntry] = [] 

42 

43 for key in sim_obj.schema.propertyPackagePorts.keys(): 

44 if key != "__none__": 44 ↛ 43line 44 didn't jump to line 43 because the condition on line 44 was always true

45 stream_ls.append( 

46 StreamDataEntry( 

47 flowsheet=group.flowsheet, 

48 streamDataProject=streamDataProject, 

49 unitop=sim_obj, 

50 group=group, 

51 property_package_mapping=key, 

52 ) 

53 ) 

54 StreamDataEntry.objects.bulk_create(stream_ls) 

55 

56 

57def compare_compositions(stream_1: SimulationObject, stream_2: SimulationObject) -> bool: 

58 """ 

59 Compares the composition of two streams 

60 Returns: boolean indicating equality - True if compositions are equal 

61 """ 

62 stream_1_set = get_compounds(stream_1) 

63 stream_2_set = get_compounds(stream_2) 

64 return stream_1_set == stream_2_set 

65 

66 

67 

68 

69@api_view_validate 

70@api_view(['POST']) 

71def extract_stream_data(request) -> Response: 

72 try: 

73 flowsheet_id = request.GET.get("flowsheet") 

74 factory = PinchFactory(flowsheet_id) 

75 

76 streamDataEntries = StreamDataEntry.objects.filter(custom=False) 

77 

78 segments = [] 

79 

80 for streamDataEntry in streamDataEntries: 

81 # im just going to delete the segments and create new ones for now 

82 streamDataEntry.Segments.all().delete() 

83 

84 inlet_stream, outlet_stream = streamDataEntry.inlet_outlet_stream 

85 if not inlet_stream: 

86 continue 

87 inlet_properties = inlet_stream.properties 

88 mole_flow = (inlet_properties.get_property( 

89 "flow_mol").get_value(), inlet_properties.get_property("flow_mol").unit) 

90 sevice_moleFlow = convert_value( 

91 mole_flow[0], mole_flow[1], 'mol/s') 

92 

93 # SI Units 

94 inlet_temp, outlet_temp = streamDataEntry.temperatures 

95 

96 inlet_temp_val = inlet_temp.get_value() 

97 outlet_temp_val = outlet_temp.get_value() 

98 if inlet_temp_val is None or outlet_temp_val is None: 

99 continue 

100 service_T_supply = convert_value( 

101 inlet_temp_val, inlet_temp.unit, 'degK') 

102 service_T_target = convert_value( 

103 outlet_temp_val, outlet_temp.unit, 'degK') 

104 

105 inlet_pressure, outlet_pressure = streamDataEntry.pressures 

106 inlet_pressure_val = inlet_pressure.get_value() 

107 outlet_pressure_val = outlet_pressure.get_value() 

108 if inlet_pressure_val is None or outlet_pressure_val is None: 

109 continue 

110 service_P_supply = convert_value( 

111 inlet_pressure_val, inlet_pressure.unit, 'Pa') 

112 service_P_target = convert_value( 

113 outlet_pressure_val, outlet_pressure.unit, 'Pa') 

114 

115 inlet_enthalpy, outlet_enthalpy = streamDataEntry.enthalpies 

116 inlet_enthalpy_val = inlet_enthalpy.get_value() 

117 outlet_enthalpy_val = outlet_enthalpy.get_value() 

118 if inlet_enthalpy_val is None or outlet_enthalpy_val is None: 

119 continue 

120 service_H_supply = convert_value( 

121 inlet_enthalpy_val, inlet_enthalpy.unit, 'J/mol') 

122 service_H_target = convert_value( 

123 outlet_enthalpy_val, outlet_enthalpy.unit, 'J/mol') 

124 

125 data = [{ 

126 't_supply': service_T_supply, 

127 't_target': service_T_target, 

128 'p_supply': service_P_supply, 

129 'p_target': service_P_target, 

130 'h_supply': service_H_supply, 

131 'h_target': service_H_target, 

132 'composition': get_compounds(inlet_stream), 

133 }] 

134 mole_flow = sevice_moleFlow 

135 ppKey = streamDataEntry.unitop.get_property_package( 

136 streamDataEntry.property_package_mapping).propertyPackage 

137 

138 t_h_data = factory.run_get_t_h_data(data, mole_flow, ppKey) 

139 

140 curve_points = t_h_data['curve_points'] 

141 states = t_h_data['states'] 

142 

143 streamDataEntry.t_h_data = t_h_data 

144 streamDataEntry.states = states 

145 streamDataEntry.save() 

146 

147 points = factory.run_linearize( 

148 curve_points, 

149 data, 

150 mole_flow, 

151 ppKey 

152 ) 

153 

154 # create segments after linearization 

155 if streamDataEntry.unitop.objectType == SimulationObjectClass.HeatExchanger: 

156 if streamDataEntry.property_package_mapping == "Cold Side": 

157 stream_type = StreamType.Cold.value 

158 else: 

159 stream_type = StreamType.Hot.value 

160 else: 

161 stream_type = StreamType.Cold.value if streamDataEntry.unitop.objectType == SimulationObjectClass.Heater else StreamType.Hot.value 

162 

163 # if no new points are created, skip 

164 if len(points) == 1: 

165 heat_flow = streamDataEntry.heat_flow.get_value() 

166 segments.append(Segment( 

167 stream_data_entry=streamDataEntry, 

168 name=f"{inlet_stream.componentName}", 

169 t_supply=inlet_temp.get_value(), 

170 t_target=outlet_temp.get_value(), 

171 heat_flow=heat_flow, 

172 flowsheet=streamDataEntry.flowsheet 

173 )) 

174 else: 

175 # Go through points to generate segments 

176 for index in range(len(points) - 1): 

177 t_supply = convert_value( 

178 points[index][1], 'degK', 'degC') 

179 segments.append(Segment( 

180 stream_data_entry=streamDataEntry, 

181 name=f"{inlet_stream.componentName} ({index + 1})", 

182 t_supply=t_supply, 

183 t_target=check_target_temperature_validity(t_supply, convert_value( 

184 points[index + 1][1], 'degK', 'degC'), stream_type), 

185 heat_flow=abs(convert_value( 

186 points[index + 1][0] - points[index][0], 'W', 'kW')), 

187 flowsheet=streamDataEntry.flowsheet 

188 )) 

189 

190 Segment.objects.bulk_create(segments) 

191 

192 return Response(status=200) 

193 

194 except Exception as e: 

195 tb_info = traceback.format_exc() 

196 print(tb_info) 

197 error_message = str(e) 

198 response_data = {'status': 'error', 

199 'message': error_message, 'traceback': tb_info} 

200 return Response(response_data, status=500) 

201 

202def check_target_temperature_validity(t_supply: float, t_target: float, stream_type: StreamType, min_delta_t: float = 0.01) -> tuple[float, float]: 

203 if abs(t_supply - t_target) < min_delta_t: 

204 t_target = t_supply - \ 

205 min_delta_t if stream_type == StreamType.Hot.value else t_supply + min_delta_t 

206 return t_target