Coverage for backend/pinch_service/OpenPinch/src/analysis/problem_table_analysis.py: 95%
127 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import numpy as np
2from typing import Tuple
3from ..utils.water_properties import Tsat_p
4from ..lib import *
5from ..classes import StreamCollection, ProblemTable, Stream
6from .support_methods import insert_temperature_interval_into_pt, linear_interpolation
8__all__ = ["problem_table_algorithm, calc_problem_table"]
11#######################################################################################################
12# Public API
13#######################################################################################################
15def problem_table_algorithm(hot_streams: StreamCollection, cold_streams: StreamCollection, all_streams: StreamCollection, config: Configuration) -> Tuple[ProblemTable, ProblemTable, dict]:
16 """Perform the problem table algorithm for a given set of hot and cold streams."""
17 # Get all possible temperature intervals, remove duplicates and order from high to low
18 pt, pt_real = _get_temperature_intervals(streams=all_streams, config=config)
20 # Perform the heat cascade of the problem table
21 pt, pt_real = calc_problem_table(pt, hot_streams, cold_streams), calc_problem_table(pt_real, hot_streams, cold_streams, False)
22 target_values = _set_zonal_targets(pt, pt_real)
24 # Correct the location of the cold composite curve and limiting GCC (real temperatures)
25 pt_real = _correct_pt_composite_curves(pt_real, target_values["heat_recovery_target"], target_values["heat_recovery_limit"])
27 # Add additional temperature intervals for ease in targeting utility etc
28 pt, pt_real = _add_temperature_intervals_at_constant_h(target_values["heat_recovery_target"], pt, pt_real)
30 return pt, pt_real, target_values
33def calc_problem_table(pt: ProblemTable, hot_streams: StreamCollection = None, cold_streams: StreamCollection = None, shifted: bool = True) -> ProblemTable:
34 """Fast calculation of the problem table using vectorized operations."""
36 # If streams are provided, calculate CP and RCP contributions per temperature interval
37 if hot_streams is not None and cold_streams is not None: 37 ↛ 47line 37 didn't jump to line 47 because the condition on line 37 was always true
38 cp_hot, rcp_hot, cp_cold, rcp_cold = _sum_mcp_between_temperature_boundaries(
39 pt.col[PT.T.value], hot_streams, cold_streams, shifted
40 )
41 pt.col[PT.CP_HOT.value] = cp_hot
42 pt.col[PT.RCP_HOT.value] = rcp_hot
43 pt.col[PT.CP_COLD.value] = cp_cold
44 pt.col[PT.RCP_COLD.value] = rcp_cold
46 # Extract numeric arrays for fast computation
47 T_col = pt.col[PT.T.value]
48 cp_hot = pt.col[PT.CP_HOT.value]
49 cp_cold = pt.col[PT.CP_COLD.value]
51 # ΔT = T_i - T_i+1, with first value set to 0.0
52 delta_T = np.empty_like(T_col)
53 delta_T[0] = 0.0
54 delta_T[1:] = T_col[:-1] - T_col[1:]
55 pt.col[PT.DELTA_T.value] = delta_T
57 # ΔH_HOT = ΔT * CP_HOT
58 delta_H_hot = delta_T * cp_hot
59 H_hot = np.cumsum(delta_H_hot)
60 pt.col[PT.DELTA_H_HOT.value] = delta_H_hot
61 pt.col[PT.H_HOT.value] = H_hot
63 # ΔH_COLD = ΔT * CP_COLD
64 delta_H_cold = delta_T * cp_cold
65 H_cold = np.cumsum(delta_H_cold)
66 pt.col[PT.DELTA_H_COLD.value] = delta_H_cold
67 pt.col[PT.H_COLD.value] = H_cold
69 # MCP_NET = CP_COLD - CP_HOT
70 mcp_net = cp_cold - cp_hot
71 pt.col[PT.MCP_NET.value] = mcp_net
73 # ΔH_NET = ΔT * MCP_NET
74 delta_H_net = delta_T * mcp_net
75 H_net = -np.cumsum(delta_H_net)
76 pt.col[PT.DELTA_H_NET.value] = delta_H_net
77 pt.col[PT.H_NET.value] = H_net
79 # Find minimum H_NET (for cascade shifting)
80 min_H = H_net.min()
82 # Shift the composite curves for alignment
83 pt.col[PT.H_HOT.value] = H_hot[-1] - H_hot
84 pt.col[PT.H_COLD.value] = H_cold[-1] + (H_net[-1] - min_H) - H_cold
85 pt.col[PT.H_NET.value] = H_net - min_H
87 return pt
90#######################################################################################################
91# Helper functions
92#######################################################################################################
94def _get_temperature_intervals(streams: List[Stream] = [], config: Configuration = None) -> Tuple[ProblemTable, ProblemTable]:
95 """Returns unordered T and T* intervals for given streams and utilities."""
97 T_star = [t for s in streams for t in (s.t_min_star, s.t_max_star)]
98 T = [t for s in streams for t in (s.t_min, s.t_max)]
100 if isinstance(config, Configuration): 100 ↛ 110line 100 didn't jump to line 110 because the condition on line 100 was always true
101 if config.TURBINE_WORK_BUTTON: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 T_val = config.T_TURBINE_BOX
103 Tsat_val = Tsat_p(config.P_TURBINE_BOX)
104 T_star.extend([T_val, Tsat_val])
105 T.extend([T_val, Tsat_val])
107 T_star.append(config.TEMP_REF)
108 T.append(config.TEMP_REF)
110 pt = ProblemTable({PT.T.value: sorted(set(T_star), reverse=True)})
111 pt_real = ProblemTable({PT.T.value: sorted(set(T), reverse=True)})
113 return pt, pt_real
116def _sum_mcp_between_temperature_boundaries(
117 temperatures: List[float],
118 hot_streams: List[Stream],
119 cold_streams: List[Stream],
120 shifted: bool = True,
121) -> Tuple[List[float], List[float], List[float], List[float]]:
122 """Vectorized CP and rCP summation across temperature intervals."""
124 def calc_active_matrix(streams: List[Stream], use_shifted: bool) -> np.ndarray:
125 t_min = np.array([s.t_min_star if use_shifted else s.t_min for s in streams])
126 t_max = np.array([s.t_max_star if use_shifted else s.t_max for s in streams])
128 lower_bounds = np.array(temperatures[1:])
129 upper_bounds = np.array(temperatures[:-1])
131 # Shape: (intervals, streams)
132 active = (t_max[np.newaxis, :] > lower_bounds[:, np.newaxis] + ZERO) & \
133 (t_min[np.newaxis, :] < upper_bounds[:, np.newaxis] - ZERO)
135 return active
137 def sum_cp_rcp(streams: List[Stream], active: np.ndarray):
138 cp = np.array([s.CP for s in streams])
139 rcp = np.array([s.rCP for s in streams])
140 cp_sum = active @ cp
141 rcp_sum = active @ rcp
142 return np.insert(cp_sum, 0, 0.0), np.insert(rcp_sum, 0, 0.0)
144 hot_active = calc_active_matrix(hot_streams, shifted)
145 cold_active = calc_active_matrix(cold_streams, shifted)
147 cp_hot, rcp_hot = sum_cp_rcp(hot_streams, hot_active)
148 cp_cold, rcp_cold = sum_cp_rcp(cold_streams, cold_active)
150 return cp_hot.tolist(), rcp_hot.tolist(), cp_cold.tolist(), rcp_cold.tolist()
153def _correct_pt_composite_curves(pt: ProblemTable, heat_recovery_target: float, heat_recovery_limit: float) -> ProblemTable:
154 """Shift H_COLD and H_NET columns to match targeted heat recovery levels."""
155 delta_shift = heat_recovery_limit - heat_recovery_target
156 pt.col[PT.H_COLD.value] += delta_shift
157 pt.col[PT.H_NET.value] += delta_shift
158 return pt
161def _add_temperature_intervals_at_constant_h(heat_recovery_target: float, pt: ProblemTable, pt_real: ProblemTable):
162 if heat_recovery_target > ZERO:
163 pt = _insert_temperature_interval_into_pt_at_constant_h(pt, PT.T.value, PT.H_HOT.value, PT.H_COLD.value)
164 pt_real = _insert_temperature_interval_into_pt_at_constant_h(pt_real, PT.T.value, PT.H_HOT.value, PT.H_COLD.value)
165 return pt, pt_real
168def _insert_temperature_interval_into_pt_at_constant_h(pt: ProblemTable, col_T: str =PT.T.value, col_HCC: str =PT.H_HOT.value, col_CCC: str =PT.H_COLD.value) -> ProblemTable:
169 """Insert temperature intervals into the process table where HCC and CCC intersect at constant enthalpy."""
170 # --- HCC to CCC projection ---
171 i = _get_composite_curve_starting_points_loc(pt, col_HCC, hcc=True)
172 pt = _get_T_value_at_cc_starts(pt, i, col_T, col_HCC, col_CCC, hcc=True)
173 # --- CCC to HCC projection ---
174 i = _get_composite_curve_starting_points_loc(pt, col_CCC, hcc=False)
175 pt = _get_T_value_at_cc_starts(pt, i, col_T, col_HCC, col_CCC, hcc=False)
176 return pt
179def _get_composite_curve_starting_points_loc(pt: ProblemTable, col: str = PT.H_HOT.value, hcc: bool = True) -> int:
180 """Find the starting index of the composite curve to be projected."""
181 i_range = range(1, len(pt)) if hcc else range(len(pt) - 1, 0, -1)
182 for i in i_range: 182 ↛ 185line 182 didn't jump to line 185 because the loop on line 182 didn't complete
183 if pt.loc[i - 1, col] > pt.loc[i, col] + ZERO:
184 break
185 return i - 1 if hcc else i
188def _get_T_value_at_cc_starts(pt: ProblemTable, start_index: int, col_T: str =PT.T.value, col_HCC: str =PT.H_HOT.value, col_CCC: str =PT.H_COLD.value, hcc: bool =True) -> ProblemTable:
189 """Find and insert the temperature value where composite curves intersect at constant enthalpy."""
190 col0, col1 = (col_HCC, col_CCC) if hcc else (col_CCC, col_HCC)
192 # T_insert_vals = []
193 i = start_index
194 k = 0
195 while 0 < i < len(pt) and k < 2:
196 h_0 = pt.loc[i, col0]
197 j_range = range(len(pt) - 1, 0, -1) if hcc else range(1, len(pt) - 1)
199 for j in j_range:
200 if abs(pt.loc[j, col1] - h_0) < ZERO:
201 k += 1
202 break
204 z = 0 if hcc else 1
205 h_j = pt.loc[j + z, col1]
206 h_jm1 = pt.loc[j - 1 + z, col1]
208 if h_0 < h_jm1 - ZERO and h_0 > h_j + ZERO:
209 t_j = pt.loc[j + z, col_T]
210 t_jm1 = pt.loc[j - 1 + z, col_T]
211 T_C = linear_interpolation(h_0, h_j, h_jm1, t_j, t_jm1)
212 # T_insert_vals.append(T_C)
213 pt, n_int_added = insert_temperature_interval_into_pt(pt, [T_C])
214 k += n_int_added
215 break
216 i = j
218 # for T_C in T_insert_vals:
219 # pt = insert_temperature_interval_into_pt(pt, T_C)
221 return pt
224def _set_zonal_targets(pt: ProblemTable, pt_real: ProblemTable) -> dict:
225 """Assign thermal targets and integration degree to the zone based on process table data."""
226 return {
227 "hot_utility_target": pt.loc[0, PT.H_NET.value],
228 "cold_utility_target": pt.loc[-1, PT.H_NET.value],
229 "heat_recovery_target": pt.loc[0, PT.H_HOT.value] - pt.loc[-1, PT.H_NET.value],
230 "heat_recovery_limit": pt_real.loc[0, PT.H_HOT.value] - pt_real.loc[-1, PT.H_NET.value],
231 "degree_of_int": (
232 (pt.loc[0, PT.H_HOT.value] - pt.loc[-1, PT.H_NET.value]) / (pt_real.loc[0, PT.H_HOT.value] - pt_real.loc[-1, PT.H_NET.value])
233 if (pt_real.loc[0, PT.H_HOT.value] - pt_real.loc[-1, PT.H_NET.value]) > 0 else 1.0
234 )
235 }