Coverage for backend/pinch_service/OpenPinch/src/analysis/utility_targeting.py: 95%

161 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import numpy as np 

2from ..utils import * 

3from ..lib.enums import * 

4from ..classes import * 

5from .support_methods import * 

6from .problem_table_analysis import calc_problem_table 

7 

8__all__ = ["get_zonal_utility_targets, target_utility, calc_GGC_utility"] 

9 

10####################################################################################################### 

11# Public API 

12####################################################################################################### 

13 

14def get_zonal_utility_targets(pt: ProblemTable, pt_real: ProblemTable, hot_utilities: StreamCollection, cold_utilities: StreamCollection, is_process_zone = True) -> Tuple[ProblemTable, ProblemTable, StreamCollection, StreamCollection, StreamCollection, StreamCollection]: 

15 # Calculate various GCC profiles 

16 if is_process_zone: 16 ↛ 18line 16 didn't jump to line 18 because the condition on line 16 was always true

17 pt = _calc_GCC_without_pockets(pt) 

18 pt = _calc_GCC_with_vertical_heat_transfer(pt) 

19 pt = _calc_GCC_actual(pt, is_process_zone) 

20 pt = _calc_GGC_pockets(pt) 

21 # Add assisted integration targeting here... 

22 pt = _calc_seperated_heat_load_profiles(pt, col_H_net=PT.H_NET_A.value) 

23 

24 # Target multiple utility use 

25 if is_process_zone: 25 ↛ 29line 25 didn't jump to line 29 because the condition on line 25 was always true

26 hot_utilities = target_utility(hot_utilities, pt, PT.T.value, PT.H_COLD_NET.value) 

27 cold_utilities = target_utility(cold_utilities, pt, PT.T.value, PT.H_HOT_NET.value) 

28 

29 pt = calc_GGC_utility(pt, hot_utilities, cold_utilities, shifted=True) 

30 pt = _calc_seperated_heat_load_profiles(pt, col_H_net=PT.H_UT_NET.value, col_H_cold_net=PT.H_COLD_UT.value, col_H_hot_net=PT.H_HOT_UT.value) 

31 

32 pt_real = calc_GGC_utility(pt_real, hot_utilities, cold_utilities, shifted=False) 

33 pt_real = _calc_seperated_heat_load_profiles(pt_real, col_H_net=PT.H_UT_NET.value, col_H_cold_net=PT.H_COLD_UT.value, col_H_hot_net=PT.H_HOT_UT.value) 

34 pt_real = _calc_balanced_CC(pt_real) 

35 

36 return pt, pt_real, hot_utilities, cold_utilities 

37 

38 

39def target_utility(utilities: List[Stream], pt: ProblemTable, col_T: str, col_H: str, real_T=False) -> List[Stream]: 

40 """Targets multiple utility use considering a fixed target temperature.""" 

41 if len(utilities) == 0: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 return utilities 

43 

44 pt = pt.copy 

45 pt.round(6) 

46 hot_pinch_row, cold_pinch_row, _ = get_pinch_loc(pt, col_H) 

47 

48 if pt.col[col_H].min() < -ZERO: 

49 pt.col[col_H] = pt.col[col_H] * -1 

50 

51 if utilities[0].type == StreamType.Hot.value and abs(pt.loc[0, col_H]) > ZERO: 

52 utilities = _assign_utility(pt, col_T, col_H, utilities, hot_pinch_row, is_hot_ut=True, real_T=real_T) 

53 

54 elif utilities[0].type == StreamType.Cold.value and abs(pt.loc[-1, col_H]) > ZERO: 

55 utilities = _assign_utility(pt, col_T, col_H, utilities, cold_pinch_row, is_hot_ut=False, real_T=real_T) 

56 

57 return utilities 

58 

59 

60def calc_GGC_utility(pt: ProblemTable, hot_utilities: List[Stream], cold_utilities: List[Stream], shifted: bool = True) -> ProblemTable: 

61 """Returns the GCC profile for utility use of a process as a DataFrame.""" 

62 pt_temp = calc_problem_table( 

63 ProblemTable({PT.T.value: pt.col[PT.T.value]}), 

64 hot_utilities, cold_utilities, shifted 

65 ) 

66 pt.col[PT.H_UT_NET.value] = pt_temp.col[PT.H_NET.value].max() - pt_temp.col[PT.H_NET.value] 

67 pt.col[PT.RCP_HOT_UT.value] = pt_temp.col[PT.RCP_HOT.value] 

68 pt.col[PT.RCP_COLD_UT.value] = pt_temp.col[PT.RCP_COLD.value] 

69 pt.col[PT.RCP_UT_NET.value] = pt_temp.col[PT.RCP_HOT.value] + pt_temp.col[PT.RCP_COLD.value] 

70 return pt 

71 

72 

73####################################################################################################### 

74# Helper functions: get_zonal_utility_targets 

75####################################################################################################### 

76 

77def _calc_GCC_without_pockets(pt: ProblemTable, col_H_NP: str =PT.H_NET_NP.value, col_H: str =PT.H_NET.value) -> Tuple[ProblemTable, ProblemTable]: 

78 pt.col[col_H_NP] = pt.col[col_H] 

79 

80 hot_pinch_loc, cold_pinch_loc, valid = get_pinch_loc(pt) 

81 if not valid: 

82 return pt 

83 

84 # Remove any possible pocket segments between the pinches 

85 if hot_pinch_loc + 1 < cold_pinch_loc: 

86 for j in range(hot_pinch_loc + 1, cold_pinch_loc): 

87 pt.loc[j, col_H_NP] = 0 

88 

89 # Remove pocket segments above the Pinch 

90 pt, hot_pinch_loc, cold_pinch_loc = _remove_pockets_on_one_side_of_the_pinch(pt, col_H_NP, col_H, hot_pinch_loc, cold_pinch_loc, True) 

91 

92 # Remove pocket segments below the Pinch 

93 pt, hot_pinch_loc, cold_pinch_loc = _remove_pockets_on_one_side_of_the_pinch(pt, col_H_NP, col_H, hot_pinch_loc, cold_pinch_loc, False) 

94 

95 return pt 

96 

97 

98def _remove_pockets_on_one_side_of_the_pinch(pt: ProblemTable, col_H_NP: str =PT.H_NET_NP.value, col_H: str =PT.H_NET.value, hot_pinch_loc: int = None, cold_pinch_loc: int = None, is_above_pinch: bool = True) -> Tuple[ProblemTable, ProblemTable]: 

99 

100 # Settings for removing pocket segments for above/below the pinch 

101 if is_above_pinch: 

102 i = 0 

103 pinch_loc = hot_pinch_loc 

104 sgn = 1 

105 else: 

106 i = len(pt) - 1 

107 pinch_loc = cold_pinch_loc 

108 sgn = -1 

109 

110 T_vals, H_vals, H_NP_vals = pt.col[PT.T.value], pt.col[col_H], pt.col[col_H_NP] 

111 

112 if H_vals[i] < ZERO: 

113 # No heating or cooling required 

114 return pt, hot_pinch_loc, cold_pinch_loc 

115 

116 for _ in range(i, pinch_loc, sgn): 116 ↛ 145line 116 didn't jump to line 145 because the loop on line 116 didn't complete

117 di = sgn 

118 n_int_added = 0 

119 if H_vals[i] < H_vals[i + sgn] - ZERO: 

120 i_0 = i 

121 i = _pocket_exit_index(H_vals, i, pinch_loc, sgn) 

122 

123 if i != pinch_loc: 

124 T0 = linear_interpolation(H_vals[i_0], H_vals[i], H_vals[i + sgn], T_vals[i], T_vals[i + sgn]) 

125 pt, n_int_added = insert_temperature_interval_into_pt(pt, T0) 

126 

127 if n_int_added > 0: 

128 T_vals, H_vals, H_NP_vals = pt.col[PT.T.value], pt.col[col_H], pt.col[col_H_NP] 

129 if is_above_pinch: 

130 hot_pinch_loc += n_int_added 

131 cold_pinch_loc += n_int_added 

132 else: 

133 i_0 += n_int_added 

134 

135 j_rng = range(i_0 + 1, i + 1) if is_above_pinch else range(i + 1, i_0) 

136 for j in j_rng: 

137 H_NP_vals[j] = H_vals[i_0] 

138 

139 di = n_int_added * sgn 

140 

141 i += di 

142 if (pinch_loc - i) * sgn <= 0: 

143 break 

144 

145 return pt, hot_pinch_loc, cold_pinch_loc 

146 

147 

148def _pocket_exit_index(H_vals: np.ndarray, i_0: int, pinch_loc: int, sgn: int) -> int: 

149 if sgn > 0: 

150 for i in range(i_0 + 1, pinch_loc + 1): 

151 if H_vals[i_0] >= H_vals[i] + ZERO: 

152 return i - 1 

153 return pinch_loc 

154 else: 

155 for i in range(i_0 - 1, pinch_loc - 1, -1): 155 ↛ 158line 155 didn't jump to line 158 because the loop on line 155 didn't complete

156 if H_vals[i_0] >= H_vals[i] + ZERO: 

157 return i + 1 

158 return pinch_loc 

159 

160 

161def _calc_GCC_with_vertical_heat_transfer(pt: ProblemTable) -> ProblemTable: 

162 """Returns the extreme GCC where heat transfer on the composite curves is vertical (not horizontal).""" 

163 # Top section of the vGCC 

164 hcc_max = pt.loc[0, PT.H_HOT.value] 

165 pt.col[PT.H_NET_V.value] = np.where(pt.col[PT.H_COLD.value] > hcc_max, pt.col[PT.H_COLD.value] - hcc_max, 0.0) 

166 # Bottom section of the vGCC 

167 cu_tar = pt.loc[-1, PT.H_NET.value] 

168 pt.col[PT.H_NET_V.value] = np.where(pt.col[PT.H_HOT.value] < cu_tar, cu_tar - pt.col[PT.H_HOT.value], pt.col[PT.H_NET_V.value]) 

169 return pt 

170 

171 

172def _calc_GCC_actual(pt: ProblemTable, is_process_zone: bool = True, f_horizontal_HT: float = 1.0) -> ProblemTable: 

173 """Return the actual GCC based on utility usage and heat transfer direction settings.""" 

174 pt.col[PT.H_NET_A.value] = pt.col[PT.H_NET_NP.value] if is_process_zone else pt.col[PT.H_NET.value] 

175 pt.col[PT.H_NET_A.value] = pt.col[PT.H_NET_NP.value] * f_horizontal_HT + pt.col[PT.H_NET_V.value] * (1 - f_horizontal_HT) 

176 return pt 

177 

178 

179def _calc_GGC_pockets(pt: ProblemTable) -> ProblemTable: 

180 pt.col[PT.H_NET_PK.value] = pt.col[PT.H_NET.value] - pt.col[PT.H_NET_NP.value] 

181 return pt 

182 

183 

184def _calc_seperated_heat_load_profiles( 

185 pt: ProblemTable, 

186 col_H_net: str = PT.H_NET_A.value, 

187 col_H_hot_net: str = PT.H_HOT_NET.value, 

188 col_H_cold_net: str = PT.H_COLD_NET.value, 

189 col_RCP_net: str = PT.RCP_UT_NET.value, 

190 col_RCP_hot_net: str = PT.RCP_HOT_UT.value, 

191 col_RCP_cold_net: str = PT.RCP_COLD_UT.value, 

192 ) -> ProblemTable: 

193 """Determines the gross required heating or cooling profile of a system from the GCC.""" 

194 

195 # Calculate ΔH differences 

196 dh_diff = pt.delta_col(col_H_net, 1) 

197 

198 # Determine whether each row corresponds to a hot-side or cold-side enthalpy change 

199 is_hot = dh_diff <= 0 

200 is_cold = ~is_hot 

201 

202 # Compute cumulative enthalpy change 

203 pt.col[col_H_hot_net] = np.cumsum(-dh_diff * is_hot) 

204 pt.col[col_H_cold_net] = np.cumsum(-dh_diff * is_cold) 

205 

206 # Handle RCP (HTR x CP) 

207 pt.col[col_RCP_hot_net] = pt.col[col_RCP_net] * is_hot 

208 pt.col[col_RCP_cold_net] = pt.col[col_RCP_net] * is_cold 

209 

210 # Normalize HU profile so it starts at zero 

211 pt.col[col_H_hot_net] *= -1 

212 HUt_max = -pt.loc[-1, col_H_cold_net] 

213 pt.col[col_H_cold_net] += HUt_max 

214 

215 return pt 

216 

217 

218def _calc_balanced_CC(pt: ProblemTable) -> ProblemTable: 

219 """Creates the balanced Composite Curve (CC) using both process and utility streams.""" 

220 

221 pt.col[PT.H_HOT_BAL.value] = pt.col[PT.H_HOT.value] + pt.col[PT.H_HOT_UT.value] 

222 pt.col[PT.H_COLD_BAL.value] = pt.col[PT.H_COLD.value] + pt.col[PT.H_COLD_UT.value] 

223 

224 return pt 

225 

226 

227####################################################################################################### 

228# Helper functions: assisted integration 

229####################################################################################################### 

230 

231def _calc_GCC_assisted_integration(pt: ProblemTable, dt_cut: float = 10, dt_cut_min: float = 0) -> ProblemTable: 

232 """Modify PT in-place to reflect assisted GCC and return the GCC_AI result.""" 

233 

234 # pt.col[PT.H_NET_PK.value] = pt.col[PT.H_NET.value] - pt.col[PT.H_NET_NP.value] 

235 

236 # if np.sum(pt.col[PT.H_NET_PK.value]) < ZERO * len(pt): 

237 # pt.col[PT.H_NET_AI.value] = pt.col[PT.H_NET.value] 

238 # return pt 

239 

240 # i = len(pt) 

241 # while i > 0: 

242 # if pt.loc[i - 1, PT.H_NET_PK.value] > ZERO: 

243 # i_lb = i 

244 # for i in range(i, 0, -1): 

245 # if pt.loc[i, PT.H_NET_PK.value] < ZERO: 

246 # break 

247 # i_ub = i 

248 # _compute_pocket_temperature_differences 

249 

250 # else: 

251 # i += 1 

252 

253 # pt.col[PT.H_NET_AI.value] = pt.col[PT.H_NET.value] - pt.col[PT.H_NET_PK.value] 

254 return pt 

255 

256 

257####################################################################################################### 

258# Helper functions: target_utility 

259####################################################################################################### 

260 

261def _assign_utility(pt: ProblemTable, col_T: Enum, col_H: Enum, u_ls: List[Stream], pinch_row: int, is_hot_ut: bool, real_T: bool) -> List[Stream]: 

262 """Assigns utility heat duties based on vertical heat transfer across a pinch.""" 

263 hl = ProblemTable({ 

264 col_T: pt.col[col_T], 

265 col_H: pt.col[col_H], 

266 "T_out": np.nan, 

267 "Q_pot": np.nan, 

268 "dt_tar": np.nan, 

269 "dt_sup": np.nan, 

270 "Q_tt": np.nan, 

271 }, add_default_labels=False) 

272 hl.data = hl.data[:pinch_row+1] if is_hot_ut else hl.data[pinch_row-1:] 

273 

274 Q_assigned = 0.0 

275 for u in reversed(u_ls) if is_hot_ut else u_ls: 275 ↛ 290line 275 didn't jump to line 290 because the loop on line 275 didn't complete

276 Ts, Tt = ( 

277 (u.t_max, u.t_min) if real_T else (u.t_max_star, u.t_min_star) 

278 ) if is_hot_ut else ( 

279 (u.t_min, u.t_max) if real_T else (u.t_min_star, u.t_max_star) 

280 ) 

281 

282 Q_ut_max = _maximise_utility_duty(hl, Ts, Tt, col_T, col_H, is_hot_ut, Q_assigned) 

283 if Q_ut_max > ZERO: 

284 u.set_heat_flow(Q_ut_max) 

285 Q_assigned += Q_ut_max 

286 

287 if abs(hl.loc[0 if is_hot_ut else -1, col_H] - Q_assigned) < ZERO: 

288 break 

289 

290 return u_ls 

291 

292 

293def _maximise_utility_duty(hl_in: ProblemTable, Ts: float, Tt: float, col_T: Enum, col_H: Enum, is_hot_ut: bool, Q_assigned: float) -> Tuple[float, int]: 

294 hl: ProblemTable = hl_in.copy 

295 sgn = 1 if is_hot_ut else -1 

296 hl.col["T_out"] = hl.shift(col_T, sgn) 

297 hl.col["Q_pot"] = hl.shift(col_H, sgn) - Q_assigned 

298 hl.col["dt_tar"] = (Tt - hl.col[col_T]) * sgn 

299 hl.col["dt_sup"] = (Ts - hl.shift(col_T, sgn)) * sgn 

300 hl.data = hl.data[1:] if is_hot_ut else hl.data[:-1] 

301 hl.data = hl.data[ 

302 (hl.shift(col_H, sgn) != hl.col[col_H]) & 

303 (hl.col["dt_sup"] >= -ZERO) & 

304 (hl.col["Q_pot"] > ZERO) 

305 ] 

306 

307 if hl.data.size == 0 or hl.col["dt_tar"].max() < 0: 

308 return 0.0 

309 

310 if hl.col["dt_tar"].max() < 0: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 return 0.0 

312 Q_ts_max = hl.col["Q_pot"].max() 

313 

314 Q_tt = np.full_like(hl.col["Q_pot"], np.inf) 

315 mask = -hl.col["dt_tar"] > ZERO 

316 Q_tt[mask] = hl.col["Q_pot"][mask] / -hl.col["dt_tar"][mask] * abs(Tt - Ts) 

317 Q_tt_max = Q_tt.min() 

318 

319 return min(Q_ts_max, Q_tt_max) if hl.col["dt_tar"].max() >= 0 else 0.0