Coverage for backend/django/core/auxiliary/views/ExtractSegmentDataFromFS.py: 87%

128 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-12-18 04:00 +0000

1import traceback 

2from typing import Union, Tuple 

3from rest_framework.decorators import api_view 

4from rest_framework.response import Response 

5from drf_spectacular.utils import extend_schema 

6from rest_framework import serializers 

7from idaes_factory.unit_conversion.unit_conversion import convert_value 

8from core.validation import api_view_validate 

9 

10from PinchAnalysis.models.InputModels import Segment, StreamDataEntry 

11from PinchAnalysis.models.HenNode import HenNode 

12from PinchAnalysis.models.StreamDataProject import StreamDataProject 

13from PinchAnalysis.views.henNodeHelpers import group_stream_by_unitop_type, set_hennode_connections 

14from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

15from core.auxiliary.enums.pinchEnums import StreamType 

16from core.auxiliary.enums.unitOpData import SimulationObjectClass 

17from pinch_factory.pinch_factory import PinchFactory 

18from django.db.models import Q 

19 

20DECIMAL_PLACES = 3 

21VARIANCE = 0.01 

22 

23 

24def get_compounds(stream, include_null: bool = False) -> set[tuple[str, float]]: 

25 """ 

26 Returns a set of tuples containing the index (key) and value of all property 

27 value objects in the stream's mole_frac_comp. Used for composition comparisons. 

28 

29 Returns: 

30 - set[tuple[str, float]]: A set of (key, value) tuples. 

31 """ 

32 mole_frac_comp = stream.properties.get_property("mole_frac_comp") 

33 property_values = mole_frac_comp.values.all() 

34 result = [ 

35 (prop.get_index("compound").key, prop.value) 

36 for prop in property_values 

37 if (prop.value not in [None, ""] or include_null) 

38 ] 

39 return result 

40 

41 

42def create_he_streams(sim_obj, group) -> None: 

43 streamDataProject = StreamDataProject.objects.first() 

44 stream_ls: list[StreamDataEntry] = [] 

45 

46 for key in sim_obj.schema.propertyPackagePorts.keys(): 

47 if key != "__none__": 47 ↛ 46line 47 didn't jump to line 46 because the condition on line 47 was always true

48 stream_ls.append( 

49 StreamDataEntry( 

50 flowsheet=group.flowsheet, 

51 streamDataProject=streamDataProject, 

52 unitop=sim_obj, 

53 group=group, 

54 property_package_mapping=key, 

55 ) 

56 ) 

57 StreamDataEntry.objects.bulk_create(stream_ls) 

58 

59 

60def compare_compositions(stream_1: SimulationObject, stream_2: SimulationObject) -> bool: 

61 """ 

62 Compares the composition of two streams 

63 Returns: boolean indicating equality - True if compositions are equal 

64 """ 

65 stream_1_set = get_compounds(stream_1) 

66 stream_2_set = get_compounds(stream_2) 

67 return stream_1_set == stream_2_set 

68 

69# This needs to be revised and maybe moved elsewhere. The dT is WRONG. also it needs to be ln(dT). 

70# specifically, we need dT of cold segment, and dT of hot segment (pairs from exchanger(?), and we dont have that ehre.) 

71def _calc_area(htc: float, q_kw: float, t_supply_c: float, t_target_c: float) -> float: 

72 U = float(htc or 0) 

73 Q = float(q_kw or 0) 

74 dT = abs(float(t_supply_c or 0) - float(t_target_c or 0)) 

75 if U <= 0 or dT <= 0: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 return 0.0 

77 return Q / (U * dT) 

78 

79 

80def _get_io_stream_properties(streamDataEntry: StreamDataEntry, prop_arg: str, tar_unit: str) -> Tuple[float,float]: 

81 i, o = getattr(streamDataEntry, prop_arg) 

82 i_val = i.get_value() 

83 o_val = o.get_value() 

84 if i_val is None or o_val is None: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 return None, None 

86 else: 

87 supply = convert_value(i_val, i.unit, tar_unit) 

88 target = convert_value(o_val, o.unit, tar_unit) 

89 return supply, target 

90 

91 

92def _get_stream_type(streamDataEntry: StreamDataEntry): 

93 if streamDataEntry.unitop.objectType == SimulationObjectClass.HeatExchanger: 

94 if streamDataEntry.property_package_mapping == "Cold Side": 

95 stream_type = StreamType.Cold.value 

96 else: 

97 stream_type = StreamType.Hot.value 

98 else: 

99 if streamDataEntry.unitop.objectType == SimulationObjectClass.Heater: 

100 stream_type = StreamType.Cold.value 

101 else: 

102 stream_type = StreamType.Hot.value 

103 return stream_type 

104 

105 

106def _get_terminal_states(streamDataEntry: StreamDataEntry) -> dict: 

107 inlet_stream, _ = streamDataEntry.inlet_outlet_stream 

108 if not inlet_stream: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 return None 

110 

111 mole_flow = convert_value( 

112 inlet_stream.properties.get_property("flow_mol").get_value(), 

113 inlet_stream.properties.get_property("flow_mol").unit, 

114 "mol/s", 

115 ) 

116 service_T_supply, service_T_target = _get_io_stream_properties(streamDataEntry, "temperatures", "degK") 

117 service_P_supply, service_P_target = _get_io_stream_properties(streamDataEntry, "pressures", "Pa") 

118 service_H_supply, service_H_target = _get_io_stream_properties(streamDataEntry, "enthalpies", "J/mol") 

119 

120 if service_T_supply == None or service_P_supply == None or service_H_target == None: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 return None 

122 else: 

123 return { 

124 "streams_io_props": [{ 

125 "t_supply": service_T_supply, 

126 "t_target": service_T_target, 

127 "p_supply": service_P_supply, 

128 "p_target": service_P_target, 

129 "h_supply": service_H_supply, 

130 "h_target": service_H_target, 

131 "composition": get_compounds(inlet_stream), 

132 }], 

133 "ppKey": streamDataEntry.unitop.get_property_package(streamDataEntry.property_package_mapping).propertyPackage, 

134 "mole_flow": mole_flow, 

135 "comp_name": inlet_stream.componentName, 

136 "stream_type": _get_stream_type(streamDataEntry), 

137 "streamDataEntry": streamDataEntry, 

138 } 

139 

140 

141def _stream_segment_creator(comp_name: str, points: list, stream_type: str, streamDataEntry: StreamDataEntry, **_): 

142 # Create segments after linearization 

143 segments = [] 

144 for index in range(len(points) - 1): 

145 t_supply = convert_value(points[index][1], "degK", "degC") 

146 t_target = check_target_temperature_validity( 

147 t_supply, 

148 convert_value(points[index + 1][1], "degK", "degC"), 

149 stream_type 

150 ) 

151 heat_flow = abs( 

152 convert_value(points[index + 1][0] - points[index][0], "W", "kW") 

153 ) 

154 htc=1 

155 segments.append( 

156 Segment( 

157 stream_data_entry=streamDataEntry, 

158 name=f"{comp_name} ({index + 1})" if len(points) > 1 else comp_name, 

159 t_supply=t_supply, 

160 t_target=t_target, 

161 heat_flow=heat_flow, 

162 htc=htc, 

163 area=_calc_area(htc,heat_flow, t_supply, t_target), 

164 flowsheet=streamDataEntry.flowsheet, 

165 ) 

166 ) 

167 return segments 

168 

169 

170@api_view_validate 

171@api_view(['POST']) 

172def extract_stream_data(request) -> Response: 

173 try: 

174 flowsheet_id = request.GET.get("flowsheet") 

175 factory = PinchFactory(flowsheet_id) 

176 

177 streamDataEntries = StreamDataEntry.objects.filter(custom=False) 

178 

179 segments = [] 

180 # i'll just delete hennodes and create new ones for now 

181 HenNode.objects.filter(flowsheet_id=flowsheet_id).delete() 

182 

183 for streamDataEntry in streamDataEntries: 

184 streamDataEntry.Segments.all().delete() 

185 terminal_data = _get_terminal_states(streamDataEntry) 

186 if terminal_data is None: 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 continue 

188 terminal_data["prev_states"] = streamDataEntry.states if hasattr(streamDataEntry, "states") else None 

189 t_h_data = factory.run_get_t_h_data(**terminal_data) 

190 

191 streamDataEntry.t_h_data = t_h_data["curve_points"] 

192 streamDataEntry.states = t_h_data["states"] 

193 streamDataEntry.save() 

194 

195 linearised_points = factory.run_linearize(t_h_data["curve_points"], **terminal_data) 

196 new_segments = _stream_segment_creator(points=linearised_points, **terminal_data) 

197 segments.extend(new_segments) 

198 

199 Segment.objects.bulk_create(segments) 

200 

201 # create hennodes 

202 created_segments = list(Segment.objects.filter(id__in=[s.id for s in segments])) 

203 sdes = {seg.stream_data_entry for seg in created_segments if seg.stream_data_entry} 

204 

205 processed_stream_ids = set(HenNode.objects.filter( 

206 Q(stream_data_entry__in=sdes) | 

207 Q(hot_connection__in=sdes) | 

208 Q(cold_connection__in=sdes) 

209 ).values_list('stream_data_entry_id', flat=True)) 

210 

211 hennodes_to_create = [] 

212 

213 # group streams by unitop type 

214 grouped_by_unitop = group_stream_by_unitop_type(sdes, processed_stream_ids, hennodes_to_create) 

215 

216 # set connections for the hennodes 

217 set_hennode_connections(grouped_by_unitop, processed_stream_ids, hennodes_to_create) 

218 

219 HenNode.objects.bulk_create(hennodes_to_create) 

220 

221 # link segments to hennodes 

222 for segment in created_segments: 

223 sde = segment.stream_data_entry 

224 if sde: 224 ↛ 222line 224 didn't jump to line 222 because the condition on line 224 was always true

225 hn = HenNode.objects.filter( 

226 Q(stream_data_entry=sde) | 

227 Q(hot_connection=sde) | 

228 Q(cold_connection=sde) 

229 ).first() 

230 if hn: 230 ↛ 222line 230 didn't jump to line 222 because the condition on line 230 was always true

231 segment.hen_node = hn 

232 segment.save(update_fields=['hen_node']) 

233 

234 return Response(status=200) 

235 

236 except Exception as e: 

237 tb_info = traceback.format_exc() 

238 print(tb_info) 

239 error_message = str(e) 

240 response_data = { 

241 "status": "error", 

242 "message": error_message, 

243 "traceback": tb_info 

244 } 

245 return Response(response_data, status=500) 

246 

247def check_target_temperature_validity(t_supply: float, t_target: float, stream_type: StreamType, min_delta_t: float = 0.0001) -> tuple[float, float]: 

248 if abs(t_supply - t_target) < min_delta_t: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 t_target = t_supply - min_delta_t if stream_type == StreamType.Hot.value else t_supply + min_delta_t 

250 return t_target