Coverage for backend/pinch_service/OpenPinch/src/analysis/utility_targeting.py: 95%
161 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import numpy as np
2from ..utils import *
3from ..lib.enums import *
4from ..classes import *
5from .support_methods import *
6from .problem_table_analysis import calc_problem_table
8__all__ = ["get_zonal_utility_targets, target_utility, calc_GGC_utility"]
10#######################################################################################################
11# Public API
12#######################################################################################################
14def get_zonal_utility_targets(pt: ProblemTable, pt_real: ProblemTable, hot_utilities: StreamCollection, cold_utilities: StreamCollection, is_process_zone = True) -> Tuple[ProblemTable, ProblemTable, StreamCollection, StreamCollection, StreamCollection, StreamCollection]:
15 # Calculate various GCC profiles
16 if is_process_zone: 16 ↛ 18line 16 didn't jump to line 18 because the condition on line 16 was always true
17 pt = _calc_GCC_without_pockets(pt)
18 pt = _calc_GCC_with_vertical_heat_transfer(pt)
19 pt = _calc_GCC_actual(pt, is_process_zone)
20 pt = _calc_GGC_pockets(pt)
21 # Add assisted integration targeting here...
22 pt = _calc_seperated_heat_load_profiles(pt, col_H_net=PT.H_NET_A.value)
24 # Target multiple utility use
25 if is_process_zone: 25 ↛ 29line 25 didn't jump to line 29 because the condition on line 25 was always true
26 hot_utilities = target_utility(hot_utilities, pt, PT.T.value, PT.H_COLD_NET.value)
27 cold_utilities = target_utility(cold_utilities, pt, PT.T.value, PT.H_HOT_NET.value)
29 pt = calc_GGC_utility(pt, hot_utilities, cold_utilities, shifted=True)
30 pt = _calc_seperated_heat_load_profiles(pt, col_H_net=PT.H_UT_NET.value, col_H_cold_net=PT.H_COLD_UT.value, col_H_hot_net=PT.H_HOT_UT.value)
32 pt_real = calc_GGC_utility(pt_real, hot_utilities, cold_utilities, shifted=False)
33 pt_real = _calc_seperated_heat_load_profiles(pt_real, col_H_net=PT.H_UT_NET.value, col_H_cold_net=PT.H_COLD_UT.value, col_H_hot_net=PT.H_HOT_UT.value)
34 pt_real = _calc_balanced_CC(pt_real)
36 return pt, pt_real, hot_utilities, cold_utilities
39def target_utility(utilities: List[Stream], pt: ProblemTable, col_T: str, col_H: str, real_T=False) -> List[Stream]:
40 """Targets multiple utility use considering a fixed target temperature."""
41 if len(utilities) == 0: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 return utilities
44 pt = pt.copy
45 pt.round(6)
46 hot_pinch_row, cold_pinch_row, _ = get_pinch_loc(pt, col_H)
48 if pt.col[col_H].min() < -ZERO:
49 pt.col[col_H] = pt.col[col_H] * -1
51 if utilities[0].type == StreamType.Hot.value and abs(pt.loc[0, col_H]) > ZERO:
52 utilities = _assign_utility(pt, col_T, col_H, utilities, hot_pinch_row, is_hot_ut=True, real_T=real_T)
54 elif utilities[0].type == StreamType.Cold.value and abs(pt.loc[-1, col_H]) > ZERO:
55 utilities = _assign_utility(pt, col_T, col_H, utilities, cold_pinch_row, is_hot_ut=False, real_T=real_T)
57 return utilities
60def calc_GGC_utility(pt: ProblemTable, hot_utilities: List[Stream], cold_utilities: List[Stream], shifted: bool = True) -> ProblemTable:
61 """Returns the GCC profile for utility use of a process as a DataFrame."""
62 pt_temp = calc_problem_table(
63 ProblemTable({PT.T.value: pt.col[PT.T.value]}),
64 hot_utilities, cold_utilities, shifted
65 )
66 pt.col[PT.H_UT_NET.value] = pt_temp.col[PT.H_NET.value].max() - pt_temp.col[PT.H_NET.value]
67 pt.col[PT.RCP_HOT_UT.value] = pt_temp.col[PT.RCP_HOT.value]
68 pt.col[PT.RCP_COLD_UT.value] = pt_temp.col[PT.RCP_COLD.value]
69 pt.col[PT.RCP_UT_NET.value] = pt_temp.col[PT.RCP_HOT.value] + pt_temp.col[PT.RCP_COLD.value]
70 return pt
73#######################################################################################################
74# Helper functions: get_zonal_utility_targets
75#######################################################################################################
77def _calc_GCC_without_pockets(pt: ProblemTable, col_H_NP: str =PT.H_NET_NP.value, col_H: str =PT.H_NET.value) -> Tuple[ProblemTable, ProblemTable]:
78 pt.col[col_H_NP] = pt.col[col_H]
80 hot_pinch_loc, cold_pinch_loc, valid = get_pinch_loc(pt)
81 if not valid:
82 return pt
84 # Remove any possible pocket segments between the pinches
85 if hot_pinch_loc + 1 < cold_pinch_loc:
86 for j in range(hot_pinch_loc + 1, cold_pinch_loc):
87 pt.loc[j, col_H_NP] = 0
89 # Remove pocket segments above the Pinch
90 pt, hot_pinch_loc, cold_pinch_loc = _remove_pockets_on_one_side_of_the_pinch(pt, col_H_NP, col_H, hot_pinch_loc, cold_pinch_loc, True)
92 # Remove pocket segments below the Pinch
93 pt, hot_pinch_loc, cold_pinch_loc = _remove_pockets_on_one_side_of_the_pinch(pt, col_H_NP, col_H, hot_pinch_loc, cold_pinch_loc, False)
95 return pt
98def _remove_pockets_on_one_side_of_the_pinch(pt: ProblemTable, col_H_NP: str =PT.H_NET_NP.value, col_H: str =PT.H_NET.value, hot_pinch_loc: int = None, cold_pinch_loc: int = None, is_above_pinch: bool = True) -> Tuple[ProblemTable, ProblemTable]:
100 # Settings for removing pocket segments for above/below the pinch
101 if is_above_pinch:
102 i = 0
103 pinch_loc = hot_pinch_loc
104 sgn = 1
105 else:
106 i = len(pt) - 1
107 pinch_loc = cold_pinch_loc
108 sgn = -1
110 T_vals, H_vals, H_NP_vals = pt.col[PT.T.value], pt.col[col_H], pt.col[col_H_NP]
112 if H_vals[i] < ZERO:
113 # No heating or cooling required
114 return pt, hot_pinch_loc, cold_pinch_loc
116 for _ in range(i, pinch_loc, sgn): 116 ↛ 145line 116 didn't jump to line 145 because the loop on line 116 didn't complete
117 di = sgn
118 n_int_added = 0
119 if H_vals[i] < H_vals[i + sgn] - ZERO:
120 i_0 = i
121 i = _pocket_exit_index(H_vals, i, pinch_loc, sgn)
123 if i != pinch_loc:
124 T0 = linear_interpolation(H_vals[i_0], H_vals[i], H_vals[i + sgn], T_vals[i], T_vals[i + sgn])
125 pt, n_int_added = insert_temperature_interval_into_pt(pt, T0)
127 if n_int_added > 0:
128 T_vals, H_vals, H_NP_vals = pt.col[PT.T.value], pt.col[col_H], pt.col[col_H_NP]
129 if is_above_pinch:
130 hot_pinch_loc += n_int_added
131 cold_pinch_loc += n_int_added
132 else:
133 i_0 += n_int_added
135 j_rng = range(i_0 + 1, i + 1) if is_above_pinch else range(i + 1, i_0)
136 for j in j_rng:
137 H_NP_vals[j] = H_vals[i_0]
139 di = n_int_added * sgn
141 i += di
142 if (pinch_loc - i) * sgn <= 0:
143 break
145 return pt, hot_pinch_loc, cold_pinch_loc
148def _pocket_exit_index(H_vals: np.ndarray, i_0: int, pinch_loc: int, sgn: int) -> int:
149 if sgn > 0:
150 for i in range(i_0 + 1, pinch_loc + 1):
151 if H_vals[i_0] >= H_vals[i] + ZERO:
152 return i - 1
153 return pinch_loc
154 else:
155 for i in range(i_0 - 1, pinch_loc - 1, -1): 155 ↛ 158line 155 didn't jump to line 158 because the loop on line 155 didn't complete
156 if H_vals[i_0] >= H_vals[i] + ZERO:
157 return i + 1
158 return pinch_loc
161def _calc_GCC_with_vertical_heat_transfer(pt: ProblemTable) -> ProblemTable:
162 """Returns the extreme GCC where heat transfer on the composite curves is vertical (not horizontal)."""
163 # Top section of the vGCC
164 hcc_max = pt.loc[0, PT.H_HOT.value]
165 pt.col[PT.H_NET_V.value] = np.where(pt.col[PT.H_COLD.value] > hcc_max, pt.col[PT.H_COLD.value] - hcc_max, 0.0)
166 # Bottom section of the vGCC
167 cu_tar = pt.loc[-1, PT.H_NET.value]
168 pt.col[PT.H_NET_V.value] = np.where(pt.col[PT.H_HOT.value] < cu_tar, cu_tar - pt.col[PT.H_HOT.value], pt.col[PT.H_NET_V.value])
169 return pt
172def _calc_GCC_actual(pt: ProblemTable, is_process_zone: bool = True, f_horizontal_HT: float = 1.0) -> ProblemTable:
173 """Return the actual GCC based on utility usage and heat transfer direction settings."""
174 pt.col[PT.H_NET_A.value] = pt.col[PT.H_NET_NP.value] if is_process_zone else pt.col[PT.H_NET.value]
175 pt.col[PT.H_NET_A.value] = pt.col[PT.H_NET_NP.value] * f_horizontal_HT + pt.col[PT.H_NET_V.value] * (1 - f_horizontal_HT)
176 return pt
179def _calc_GGC_pockets(pt: ProblemTable) -> ProblemTable:
180 pt.col[PT.H_NET_PK.value] = pt.col[PT.H_NET.value] - pt.col[PT.H_NET_NP.value]
181 return pt
184def _calc_seperated_heat_load_profiles(
185 pt: ProblemTable,
186 col_H_net: str = PT.H_NET_A.value,
187 col_H_hot_net: str = PT.H_HOT_NET.value,
188 col_H_cold_net: str = PT.H_COLD_NET.value,
189 col_RCP_net: str = PT.RCP_UT_NET.value,
190 col_RCP_hot_net: str = PT.RCP_HOT_UT.value,
191 col_RCP_cold_net: str = PT.RCP_COLD_UT.value,
192 ) -> ProblemTable:
193 """Determines the gross required heating or cooling profile of a system from the GCC."""
195 # Calculate ΔH differences
196 dh_diff = pt.delta_col(col_H_net, 1)
198 # Determine whether each row corresponds to a hot-side or cold-side enthalpy change
199 is_hot = dh_diff <= 0
200 is_cold = ~is_hot
202 # Compute cumulative enthalpy change
203 pt.col[col_H_hot_net] = np.cumsum(-dh_diff * is_hot)
204 pt.col[col_H_cold_net] = np.cumsum(-dh_diff * is_cold)
206 # Handle RCP (HTR x CP)
207 pt.col[col_RCP_hot_net] = pt.col[col_RCP_net] * is_hot
208 pt.col[col_RCP_cold_net] = pt.col[col_RCP_net] * is_cold
210 # Normalize HU profile so it starts at zero
211 pt.col[col_H_hot_net] *= -1
212 HUt_max = -pt.loc[-1, col_H_cold_net]
213 pt.col[col_H_cold_net] += HUt_max
215 return pt
218def _calc_balanced_CC(pt: ProblemTable) -> ProblemTable:
219 """Creates the balanced Composite Curve (CC) using both process and utility streams."""
221 pt.col[PT.H_HOT_BAL.value] = pt.col[PT.H_HOT.value] + pt.col[PT.H_HOT_UT.value]
222 pt.col[PT.H_COLD_BAL.value] = pt.col[PT.H_COLD.value] + pt.col[PT.H_COLD_UT.value]
224 return pt
227#######################################################################################################
228# Helper functions: assisted integration
229#######################################################################################################
231def _calc_GCC_assisted_integration(pt: ProblemTable, dt_cut: float = 10, dt_cut_min: float = 0) -> ProblemTable:
232 """Modify PT in-place to reflect assisted GCC and return the GCC_AI result."""
234 # pt.col[PT.H_NET_PK.value] = pt.col[PT.H_NET.value] - pt.col[PT.H_NET_NP.value]
236 # if np.sum(pt.col[PT.H_NET_PK.value]) < ZERO * len(pt):
237 # pt.col[PT.H_NET_AI.value] = pt.col[PT.H_NET.value]
238 # return pt
240 # i = len(pt)
241 # while i > 0:
242 # if pt.loc[i - 1, PT.H_NET_PK.value] > ZERO:
243 # i_lb = i
244 # for i in range(i, 0, -1):
245 # if pt.loc[i, PT.H_NET_PK.value] < ZERO:
246 # break
247 # i_ub = i
248 # _compute_pocket_temperature_differences
250 # else:
251 # i += 1
253 # pt.col[PT.H_NET_AI.value] = pt.col[PT.H_NET.value] - pt.col[PT.H_NET_PK.value]
254 return pt
257#######################################################################################################
258# Helper functions: target_utility
259#######################################################################################################
261def _assign_utility(pt: ProblemTable, col_T: Enum, col_H: Enum, u_ls: List[Stream], pinch_row: int, is_hot_ut: bool, real_T: bool) -> List[Stream]:
262 """Assigns utility heat duties based on vertical heat transfer across a pinch."""
263 hl = ProblemTable({
264 col_T: pt.col[col_T],
265 col_H: pt.col[col_H],
266 "T_out": np.nan,
267 "Q_pot": np.nan,
268 "dt_tar": np.nan,
269 "dt_sup": np.nan,
270 "Q_tt": np.nan,
271 }, add_default_labels=False)
272 hl.data = hl.data[:pinch_row+1] if is_hot_ut else hl.data[pinch_row-1:]
274 Q_assigned = 0.0
275 for u in reversed(u_ls) if is_hot_ut else u_ls: 275 ↛ 290line 275 didn't jump to line 290 because the loop on line 275 didn't complete
276 Ts, Tt = (
277 (u.t_max, u.t_min) if real_T else (u.t_max_star, u.t_min_star)
278 ) if is_hot_ut else (
279 (u.t_min, u.t_max) if real_T else (u.t_min_star, u.t_max_star)
280 )
282 Q_ut_max = _maximise_utility_duty(hl, Ts, Tt, col_T, col_H, is_hot_ut, Q_assigned)
283 if Q_ut_max > ZERO:
284 u.set_heat_flow(Q_ut_max)
285 Q_assigned += Q_ut_max
287 if abs(hl.loc[0 if is_hot_ut else -1, col_H] - Q_assigned) < ZERO:
288 break
290 return u_ls
293def _maximise_utility_duty(hl_in: ProblemTable, Ts: float, Tt: float, col_T: Enum, col_H: Enum, is_hot_ut: bool, Q_assigned: float) -> Tuple[float, int]:
294 hl: ProblemTable = hl_in.copy
295 sgn = 1 if is_hot_ut else -1
296 hl.col["T_out"] = hl.shift(col_T, sgn)
297 hl.col["Q_pot"] = hl.shift(col_H, sgn) - Q_assigned
298 hl.col["dt_tar"] = (Tt - hl.col[col_T]) * sgn
299 hl.col["dt_sup"] = (Ts - hl.shift(col_T, sgn)) * sgn
300 hl.data = hl.data[1:] if is_hot_ut else hl.data[:-1]
301 hl.data = hl.data[
302 (hl.shift(col_H, sgn) != hl.col[col_H]) &
303 (hl.col["dt_sup"] >= -ZERO) &
304 (hl.col["Q_pot"] > ZERO)
305 ]
307 if hl.data.size == 0 or hl.col["dt_tar"].max() < 0:
308 return 0.0
310 if hl.col["dt_tar"].max() < 0: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 return 0.0
312 Q_ts_max = hl.col["Q_pot"].max()
314 Q_tt = np.full_like(hl.col["Q_pot"], np.inf)
315 mask = -hl.col["dt_tar"] > ZERO
316 Q_tt[mask] = hl.col["Q_pot"][mask] / -hl.col["dt_tar"][mask] * abs(Tt - Ts)
317 Q_tt_max = Q_tt.min()
319 return min(Q_ts_max, Q_tt_max) if hl.col["dt_tar"].max() >= 0 else 0.0