Coverage for backend/pinch_service/OpenPinch/src/analysis/problem_table_analysis.py: 95%

127 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import numpy as np 

2from typing import Tuple 

3from ..utils.water_properties import Tsat_p 

4from ..lib import * 

5from ..classes import StreamCollection, ProblemTable, Stream 

6from .support_methods import insert_temperature_interval_into_pt, linear_interpolation 

7 

8__all__ = ["problem_table_algorithm, calc_problem_table"] 

9 

10 

11####################################################################################################### 

12# Public API 

13####################################################################################################### 

14 

15def problem_table_algorithm(hot_streams: StreamCollection, cold_streams: StreamCollection, all_streams: StreamCollection, config: Configuration) -> Tuple[ProblemTable, ProblemTable, dict]: 

16 """Perform the problem table algorithm for a given set of hot and cold streams.""" 

17 # Get all possible temperature intervals, remove duplicates and order from high to low 

18 pt, pt_real = _get_temperature_intervals(streams=all_streams, config=config) 

19 

20 # Perform the heat cascade of the problem table 

21 pt, pt_real = calc_problem_table(pt, hot_streams, cold_streams), calc_problem_table(pt_real, hot_streams, cold_streams, False) 

22 target_values = _set_zonal_targets(pt, pt_real) 

23 

24 # Correct the location of the cold composite curve and limiting GCC (real temperatures) 

25 pt_real = _correct_pt_composite_curves(pt_real, target_values["heat_recovery_target"], target_values["heat_recovery_limit"]) 

26 

27 # Add additional temperature intervals for ease in targeting utility etc 

28 pt, pt_real = _add_temperature_intervals_at_constant_h(target_values["heat_recovery_target"], pt, pt_real) 

29 

30 return pt, pt_real, target_values 

31 

32 

33def calc_problem_table(pt: ProblemTable, hot_streams: StreamCollection = None, cold_streams: StreamCollection = None, shifted: bool = True) -> ProblemTable: 

34 """Fast calculation of the problem table using vectorized operations.""" 

35 

36 # If streams are provided, calculate CP and RCP contributions per temperature interval 

37 if hot_streams is not None and cold_streams is not None: 37 ↛ 47line 37 didn't jump to line 47 because the condition on line 37 was always true

38 cp_hot, rcp_hot, cp_cold, rcp_cold = _sum_mcp_between_temperature_boundaries( 

39 pt.col[PT.T.value], hot_streams, cold_streams, shifted 

40 ) 

41 pt.col[PT.CP_HOT.value] = cp_hot 

42 pt.col[PT.RCP_HOT.value] = rcp_hot 

43 pt.col[PT.CP_COLD.value] = cp_cold 

44 pt.col[PT.RCP_COLD.value] = rcp_cold 

45 

46 # Extract numeric arrays for fast computation 

47 T_col = pt.col[PT.T.value] 

48 cp_hot = pt.col[PT.CP_HOT.value] 

49 cp_cold = pt.col[PT.CP_COLD.value] 

50 

51 # ΔT = T_i - T_i+1, with first value set to 0.0 

52 delta_T = np.empty_like(T_col) 

53 delta_T[0] = 0.0 

54 delta_T[1:] = T_col[:-1] - T_col[1:] 

55 pt.col[PT.DELTA_T.value] = delta_T 

56 

57 # ΔH_HOT = ΔT * CP_HOT 

58 delta_H_hot = delta_T * cp_hot 

59 H_hot = np.cumsum(delta_H_hot) 

60 pt.col[PT.DELTA_H_HOT.value] = delta_H_hot 

61 pt.col[PT.H_HOT.value] = H_hot 

62 

63 # ΔH_COLD = ΔT * CP_COLD 

64 delta_H_cold = delta_T * cp_cold 

65 H_cold = np.cumsum(delta_H_cold) 

66 pt.col[PT.DELTA_H_COLD.value] = delta_H_cold 

67 pt.col[PT.H_COLD.value] = H_cold 

68 

69 # MCP_NET = CP_COLD - CP_HOT 

70 mcp_net = cp_cold - cp_hot 

71 pt.col[PT.MCP_NET.value] = mcp_net 

72 

73 # ΔH_NET = ΔT * MCP_NET 

74 delta_H_net = delta_T * mcp_net 

75 H_net = -np.cumsum(delta_H_net) 

76 pt.col[PT.DELTA_H_NET.value] = delta_H_net 

77 pt.col[PT.H_NET.value] = H_net 

78 

79 # Find minimum H_NET (for cascade shifting) 

80 min_H = H_net.min() 

81 

82 # Shift the composite curves for alignment 

83 pt.col[PT.H_HOT.value] = H_hot[-1] - H_hot 

84 pt.col[PT.H_COLD.value] = H_cold[-1] + (H_net[-1] - min_H) - H_cold 

85 pt.col[PT.H_NET.value] = H_net - min_H 

86 

87 return pt 

88 

89 

90####################################################################################################### 

91# Helper functions 

92####################################################################################################### 

93 

94def _get_temperature_intervals(streams: List[Stream] = [], config: Configuration = None) -> Tuple[ProblemTable, ProblemTable]: 

95 """Returns unordered T and T* intervals for given streams and utilities.""" 

96 

97 T_star = [t for s in streams for t in (s.t_min_star, s.t_max_star)] 

98 T = [t for s in streams for t in (s.t_min, s.t_max)] 

99 

100 if isinstance(config, Configuration): 100 ↛ 110line 100 didn't jump to line 110 because the condition on line 100 was always true

101 if config.TURBINE_WORK_BUTTON: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 T_val = config.T_TURBINE_BOX 

103 Tsat_val = Tsat_p(config.P_TURBINE_BOX) 

104 T_star.extend([T_val, Tsat_val]) 

105 T.extend([T_val, Tsat_val]) 

106 

107 T_star.append(config.TEMP_REF) 

108 T.append(config.TEMP_REF) 

109 

110 pt = ProblemTable({PT.T.value: sorted(set(T_star), reverse=True)}) 

111 pt_real = ProblemTable({PT.T.value: sorted(set(T), reverse=True)}) 

112 

113 return pt, pt_real 

114 

115 

116def _sum_mcp_between_temperature_boundaries( 

117 temperatures: List[float], 

118 hot_streams: List[Stream], 

119 cold_streams: List[Stream], 

120 shifted: bool = True, 

121) -> Tuple[List[float], List[float], List[float], List[float]]: 

122 """Vectorized CP and rCP summation across temperature intervals.""" 

123 

124 def calc_active_matrix(streams: List[Stream], use_shifted: bool) -> np.ndarray: 

125 t_min = np.array([s.t_min_star if use_shifted else s.t_min for s in streams]) 

126 t_max = np.array([s.t_max_star if use_shifted else s.t_max for s in streams]) 

127 

128 lower_bounds = np.array(temperatures[1:]) 

129 upper_bounds = np.array(temperatures[:-1]) 

130 

131 # Shape: (intervals, streams) 

132 active = (t_max[np.newaxis, :] > lower_bounds[:, np.newaxis] + ZERO) & \ 

133 (t_min[np.newaxis, :] < upper_bounds[:, np.newaxis] - ZERO) 

134 

135 return active 

136 

137 def sum_cp_rcp(streams: List[Stream], active: np.ndarray): 

138 cp = np.array([s.CP for s in streams]) 

139 rcp = np.array([s.rCP for s in streams]) 

140 cp_sum = active @ cp 

141 rcp_sum = active @ rcp 

142 return np.insert(cp_sum, 0, 0.0), np.insert(rcp_sum, 0, 0.0) 

143 

144 hot_active = calc_active_matrix(hot_streams, shifted) 

145 cold_active = calc_active_matrix(cold_streams, shifted) 

146 

147 cp_hot, rcp_hot = sum_cp_rcp(hot_streams, hot_active) 

148 cp_cold, rcp_cold = sum_cp_rcp(cold_streams, cold_active) 

149 

150 return cp_hot.tolist(), rcp_hot.tolist(), cp_cold.tolist(), rcp_cold.tolist() 

151 

152 

153def _correct_pt_composite_curves(pt: ProblemTable, heat_recovery_target: float, heat_recovery_limit: float) -> ProblemTable: 

154 """Shift H_COLD and H_NET columns to match targeted heat recovery levels.""" 

155 delta_shift = heat_recovery_limit - heat_recovery_target 

156 pt.col[PT.H_COLD.value] += delta_shift 

157 pt.col[PT.H_NET.value] += delta_shift 

158 return pt 

159 

160 

161def _add_temperature_intervals_at_constant_h(heat_recovery_target: float, pt: ProblemTable, pt_real: ProblemTable): 

162 if heat_recovery_target > ZERO: 

163 pt = _insert_temperature_interval_into_pt_at_constant_h(pt, PT.T.value, PT.H_HOT.value, PT.H_COLD.value) 

164 pt_real = _insert_temperature_interval_into_pt_at_constant_h(pt_real, PT.T.value, PT.H_HOT.value, PT.H_COLD.value) 

165 return pt, pt_real 

166 

167 

168def _insert_temperature_interval_into_pt_at_constant_h(pt: ProblemTable, col_T: str =PT.T.value, col_HCC: str =PT.H_HOT.value, col_CCC: str =PT.H_COLD.value) -> ProblemTable: 

169 """Insert temperature intervals into the process table where HCC and CCC intersect at constant enthalpy.""" 

170 # --- HCC to CCC projection --- 

171 i = _get_composite_curve_starting_points_loc(pt, col_HCC, hcc=True) 

172 pt = _get_T_value_at_cc_starts(pt, i, col_T, col_HCC, col_CCC, hcc=True) 

173 # --- CCC to HCC projection --- 

174 i = _get_composite_curve_starting_points_loc(pt, col_CCC, hcc=False) 

175 pt = _get_T_value_at_cc_starts(pt, i, col_T, col_HCC, col_CCC, hcc=False) 

176 return pt 

177 

178 

179def _get_composite_curve_starting_points_loc(pt: ProblemTable, col: str = PT.H_HOT.value, hcc: bool = True) -> int: 

180 """Find the starting index of the composite curve to be projected.""" 

181 i_range = range(1, len(pt)) if hcc else range(len(pt) - 1, 0, -1) 

182 for i in i_range: 182 ↛ 185line 182 didn't jump to line 185 because the loop on line 182 didn't complete

183 if pt.loc[i - 1, col] > pt.loc[i, col] + ZERO: 

184 break 

185 return i - 1 if hcc else i 

186 

187 

188def _get_T_value_at_cc_starts(pt: ProblemTable, start_index: int, col_T: str =PT.T.value, col_HCC: str =PT.H_HOT.value, col_CCC: str =PT.H_COLD.value, hcc: bool =True) -> ProblemTable: 

189 """Find and insert the temperature value where composite curves intersect at constant enthalpy.""" 

190 col0, col1 = (col_HCC, col_CCC) if hcc else (col_CCC, col_HCC) 

191 

192 # T_insert_vals = [] 

193 i = start_index 

194 k = 0 

195 while 0 < i < len(pt) and k < 2: 

196 h_0 = pt.loc[i, col0] 

197 j_range = range(len(pt) - 1, 0, -1) if hcc else range(1, len(pt) - 1) 

198 

199 for j in j_range: 

200 if abs(pt.loc[j, col1] - h_0) < ZERO: 

201 k += 1 

202 break 

203 

204 z = 0 if hcc else 1 

205 h_j = pt.loc[j + z, col1] 

206 h_jm1 = pt.loc[j - 1 + z, col1] 

207 

208 if h_0 < h_jm1 - ZERO and h_0 > h_j + ZERO: 

209 t_j = pt.loc[j + z, col_T] 

210 t_jm1 = pt.loc[j - 1 + z, col_T] 

211 T_C = linear_interpolation(h_0, h_j, h_jm1, t_j, t_jm1) 

212 # T_insert_vals.append(T_C) 

213 pt, n_int_added = insert_temperature_interval_into_pt(pt, [T_C]) 

214 k += n_int_added 

215 break 

216 i = j 

217 

218 # for T_C in T_insert_vals: 

219 # pt = insert_temperature_interval_into_pt(pt, T_C) 

220 

221 return pt 

222 

223 

224def _set_zonal_targets(pt: ProblemTable, pt_real: ProblemTable) -> dict: 

225 """Assign thermal targets and integration degree to the zone based on process table data.""" 

226 return { 

227 "hot_utility_target": pt.loc[0, PT.H_NET.value], 

228 "cold_utility_target": pt.loc[-1, PT.H_NET.value], 

229 "heat_recovery_target": pt.loc[0, PT.H_HOT.value] - pt.loc[-1, PT.H_NET.value], 

230 "heat_recovery_limit": pt_real.loc[0, PT.H_HOT.value] - pt_real.loc[-1, PT.H_NET.value], 

231 "degree_of_int": ( 

232 (pt.loc[0, PT.H_HOT.value] - pt.loc[-1, PT.H_NET.value]) / (pt_real.loc[0, PT.H_HOT.value] - pt_real.loc[-1, PT.H_NET.value]) 

233 if (pt_real.loc[0, PT.H_HOT.value] - pt_real.loc[-1, PT.H_NET.value]) > 0 else 1.0 

234 ) 

235 }