Coverage for backend/django/core/auxiliary/views/ExtractSegmentDataFromFS.py: 87%

132 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import traceback 

2from typing import Union, Tuple 

3from rest_framework.decorators import api_view 

4from rest_framework.response import Response 

5from drf_spectacular.utils import extend_schema 

6from rest_framework import serializers, status 

7from idaes_factory.unit_conversion.unit_conversion import convert_value 

8from core.validation import api_view_validate 

9from core.managers import get_flowsheet_access 

10 

11from PinchAnalysis.models.InputModels import Segment, StreamDataEntry 

12from PinchAnalysis.models.HenNode import HenNode 

13from PinchAnalysis.models.StreamDataProject import StreamDataProject 

14from PinchAnalysis.views.henNodeHelpers import group_stream_by_unitop_type, set_hennode_connections 

15from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

16from core.auxiliary.enums.pinchEnums import StreamType 

17from core.auxiliary.enums.unitOpData import SimulationObjectClass 

18from pinch_factory.pinch_factory import PinchFactory 

19from django.db.models import Q 

20 

21DECIMAL_PLACES = 3 

22VARIANCE = 0.01 

23 

24 

25def get_compounds(stream, include_null: bool = False) -> set[tuple[str, float]]: 

26 """ 

27 Returns a set of tuples containing the index (key) and value of all property 

28 value objects in the stream's mole_frac_comp. Used for composition comparisons. 

29 

30 Returns: 

31 - set[tuple[str, float]]: A set of (key, value) tuples. 

32 """ 

33 mole_frac_comp = stream.properties.get_property("mole_frac_comp") 

34 property_values = mole_frac_comp.values.all() 

35 result = [ 

36 (prop.get_index("compound").key, prop.value) 

37 for prop in property_values 

38 if (prop.value not in [None, ""] or include_null) 

39 ] 

40 return result 

41 

42 

43def create_he_streams(sim_obj, group) -> None: 

44 streamDataProject = group.flowsheet.StreamDataProject 

45 stream_ls: list[StreamDataEntry] = [] 

46 

47 for key in sim_obj.schema.propertyPackagePorts.keys(): 

48 if key != "__none__": 48 ↛ 47line 48 didn't jump to line 47 because the condition on line 48 was always true

49 stream_ls.append( 

50 StreamDataEntry( 

51 flowsheet=group.flowsheet, 

52 streamDataProject=streamDataProject, 

53 unitop=sim_obj, 

54 group=group, 

55 property_package_mapping=key, 

56 ) 

57 ) 

58 StreamDataEntry.objects.bulk_create(stream_ls) 

59 

60 

61def compare_compositions(stream_1: SimulationObject, stream_2: SimulationObject) -> bool: 

62 """ 

63 Compares the composition of two streams 

64 Returns: boolean indicating equality - True if compositions are equal 

65 """ 

66 stream_1_set = get_compounds(stream_1) 

67 stream_2_set = get_compounds(stream_2) 

68 return stream_1_set == stream_2_set 

69 

70# This needs to be revised and maybe moved elsewhere. The dT is WRONG. also it needs to be ln(dT). 

71# specifically, we need dT of cold segment, and dT of hot segment (pairs from exchanger(?), and we dont have that ehre.) 

72def _calc_area(htc: float, q_kw: float, t_supply_c: float, t_target_c: float) -> float: 

73 U = float(htc or 0) 

74 Q = float(q_kw or 0) 

75 dT = abs(float(t_supply_c or 0) - float(t_target_c or 0)) 

76 if U <= 0 or dT <= 0: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 return 0.0 

78 return Q / (U * dT) 

79 

80 

81def _get_io_stream_properties(streamDataEntry: StreamDataEntry, prop_arg: str, tar_unit: str) -> Tuple[float,float]: 

82 i, o = getattr(streamDataEntry, prop_arg) 

83 i_val = i.get_value() 

84 o_val = o.get_value() 

85 if i_val is None or o_val is None: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 return None, None 

87 else: 

88 supply = convert_value(i_val, i.unit, tar_unit) 

89 target = convert_value(o_val, o.unit, tar_unit) 

90 return supply, target 

91 

92 

93def _get_stream_type(streamDataEntry: StreamDataEntry): 

94 if streamDataEntry.unitop.objectType == SimulationObjectClass.HeatExchanger: 

95 if streamDataEntry.property_package_mapping == "Cold Side": 

96 stream_type = StreamType.Cold.value 

97 else: 

98 stream_type = StreamType.Hot.value 

99 else: 

100 if streamDataEntry.unitop.objectType == SimulationObjectClass.Heater: 

101 stream_type = StreamType.Cold.value 

102 else: 

103 stream_type = StreamType.Hot.value 

104 return stream_type 

105 

106 

107def _get_terminal_states(streamDataEntry: StreamDataEntry) -> dict: 

108 inlet_stream, _ = streamDataEntry.inlet_outlet_stream 

109 if not inlet_stream: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 return None 

111 

112 mole_flow = convert_value( 

113 inlet_stream.properties.get_property("flow_mol").get_value(), 

114 inlet_stream.properties.get_property("flow_mol").unit, 

115 "mol/s", 

116 ) 

117 service_T_supply, service_T_target = _get_io_stream_properties(streamDataEntry, "temperatures", "degK") 

118 service_P_supply, service_P_target = _get_io_stream_properties(streamDataEntry, "pressures", "Pa") 

119 service_H_supply, service_H_target = _get_io_stream_properties(streamDataEntry, "enthalpies", "J/mol") 

120 

121 if service_T_supply == None or service_P_supply == None or service_H_target == None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 return None 

123 else: 

124 return { 

125 "streams_io_props": [{ 

126 "t_supply": service_T_supply, 

127 "t_target": service_T_target, 

128 "p_supply": service_P_supply, 

129 "p_target": service_P_target, 

130 "h_supply": service_H_supply, 

131 "h_target": service_H_target, 

132 "composition": get_compounds(inlet_stream), 

133 }], 

134 "ppKey": streamDataEntry.unitop.get_property_package(), 

135 "mole_flow": mole_flow, 

136 "comp_name": inlet_stream.componentName, 

137 "stream_type": _get_stream_type(streamDataEntry), 

138 "streamDataEntry": streamDataEntry, 

139 } 

140 

141 

142def _stream_segment_creator(comp_name: str, points: list, stream_type: str, streamDataEntry: StreamDataEntry, **_): 

143 # Create segments after linearization 

144 segments = [] 

145 for index in range(len(points) - 1): 

146 t_supply = convert_value(points[index][1], "degK", "degC") 

147 t_target = check_target_temperature_validity( 

148 t_supply, 

149 convert_value(points[index + 1][1], "degK", "degC"), 

150 stream_type 

151 ) 

152 heat_flow = abs( 

153 convert_value(points[index + 1][0] - points[index][0], "W", "kW") 

154 ) 

155 htc=1 

156 segments.append( 

157 Segment( 

158 stream_data_entry=streamDataEntry, 

159 name=f"{comp_name} ({index + 1})" if len(points) > 1 else comp_name, 

160 t_supply=t_supply, 

161 t_target=t_target, 

162 heat_flow=heat_flow, 

163 htc=htc, 

164 area=_calc_area(htc,heat_flow, t_supply, t_target), 

165 flowsheet=streamDataEntry.flowsheet, 

166 ) 

167 ) 

168 return segments 

169 

170 

171@api_view_validate 

172@api_view(['POST']) 

173def extract_stream_data(request) -> Response: 

174 try: 

175 flowsheet_id = request.GET.get("flowsheet") 

176 access_state = get_flowsheet_access(request.user, flowsheet_id) 

177 if access_state.has_read_access and not access_state.has_write_access: 

178 return Response( 

179 {"error": "This flowsheet is shared with read-only access."}, 

180 status=status.HTTP_403_FORBIDDEN, 

181 ) 

182 factory = PinchFactory(flowsheet_id) 

183 

184 streamDataEntries = StreamDataEntry.objects.filter(flowsheet_id=flowsheet_id, custom=False) 

185 

186 segments = [] 

187 # i'll just delete hennodes and create new ones for now 

188 HenNode.objects.filter(flowsheet_id=flowsheet_id).delete() 

189 

190 for streamDataEntry in streamDataEntries: 

191 streamDataEntry.Segments.all().delete() 

192 terminal_data = _get_terminal_states(streamDataEntry) 

193 if terminal_data is None: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true

194 continue 

195 terminal_data["prev_states"] = streamDataEntry.states if hasattr(streamDataEntry, "states") else None 

196 t_h_data = factory.run_get_t_h_data(**terminal_data) 

197 

198 streamDataEntry.t_h_data = t_h_data["curve_points"] 

199 streamDataEntry.states = t_h_data["states"] 

200 streamDataEntry.save() 

201 

202 linearised_points = factory.run_linearize(t_h_data["curve_points"], **terminal_data) 

203 new_segments = _stream_segment_creator(points=linearised_points, **terminal_data) 

204 segments.extend(new_segments) 

205 

206 Segment.objects.bulk_create(segments) 

207 

208 # create hennodes 

209 created_segments = list(Segment.objects.filter(id__in=[s.id for s in segments])) 

210 sdes = {seg.stream_data_entry for seg in created_segments if seg.stream_data_entry} 

211 

212 processed_stream_ids = set(HenNode.objects.filter( 

213 Q(stream_data_entry__in=sdes) | 

214 Q(hot_connection__in=sdes) | 

215 Q(cold_connection__in=sdes) 

216 ).values_list('stream_data_entry_id', flat=True)) 

217 

218 hennodes_to_create = [] 

219 

220 # group streams by unitop type 

221 grouped_by_unitop = group_stream_by_unitop_type(sdes, processed_stream_ids, hennodes_to_create) 

222 

223 # set connections for the hennodes 

224 set_hennode_connections(grouped_by_unitop, processed_stream_ids, hennodes_to_create) 

225 

226 HenNode.objects.bulk_create(hennodes_to_create) 

227 

228 # link segments to hennodes 

229 for segment in created_segments: 

230 sde = segment.stream_data_entry 

231 if sde: 231 ↛ 229line 231 didn't jump to line 229 because the condition on line 231 was always true

232 hn = HenNode.objects.filter( 

233 Q(stream_data_entry=sde) | 

234 Q(hot_connection=sde) | 

235 Q(cold_connection=sde) 

236 ).first() 

237 if hn: 237 ↛ 229line 237 didn't jump to line 229 because the condition on line 237 was always true

238 segment.hen_node = hn 

239 segment.save(update_fields=['hen_node']) 

240 

241 return Response(status=200) 

242 

243 except Exception as e: 

244 tb_info = traceback.format_exc() 

245 print(tb_info) 

246 error_message = str(e) 

247 response_data = { 

248 "status": "error", 

249 "message": error_message, 

250 "traceback": tb_info 

251 } 

252 return Response(response_data, status=500) 

253 

254def check_target_temperature_validity(t_supply: float, t_target: float, stream_type: StreamType, min_delta_t: float = 0.0001) -> tuple[float, float]: 

255 if abs(t_supply - t_target) < min_delta_t: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 t_target = t_supply - min_delta_t if stream_type == StreamType.Hot.value else t_supply + min_delta_t 

257 return t_target