Coverage for backend/pinch_service/OpenPinch/src/analysis/support_methods.py: 68%
107 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from __future__ import annotations
2import numpy as np
3import math
4from typing import List, Tuple, Union
5from ..lib import *
6from ..classes.problem_table import ProblemTable
9def get_pinch_loc(pt: ProblemTable, col=PT.H_NET.value) -> tuple[int, int, bool]:
10 """Returns the row indices of the Hot and Cold Pinch Temperatures."""
11 # Pull out the 1-D profile as a NumPy array
12 h_net = np.asarray(pt.col[col])
13 n = h_net.size
15 abs_arr = np.abs(h_net)
16 zeros_mask = abs_arr < ZERO
18 if np.all(zeros_mask) == False and np.any(zeros_mask):
19 # ---------- Hot-pinch (scan top-down) ----------
20 first_zero = np.flatnonzero(zeros_mask)[0]
21 if first_zero > 0: # zero not on the first row
22 row_h = first_zero
23 else: # zero *is* the first row
24 nz_after = np.flatnonzero(~zeros_mask) # first genuinely non-zero row
25 row_h = nz_after[0] - 1 if nz_after.size else n - 1
27 # ---------- Cold-pinch (scan bottom-up) ----------
28 last_zero = np.flatnonzero(zeros_mask)[-1]
29 if last_zero < n - 1: # zero not on the last row
30 row_c = last_zero
31 else: # zero *is* the last row
32 # find first non-zero when walking *up* from the bottom
33 nz_before_rev = np.flatnonzero(~zeros_mask[::-1])
34 row_c = n - nz_before_rev[0] if nz_before_rev.size else 0
35 else:
36 row_h = n - 1
37 row_c = 0
39 valid = row_h <= row_c
40 return row_h, row_c, valid
43def get_pinch_temperatures(pt: ProblemTable, col_T: str =PT.T.value, col_H=PT.H_NET.value) -> tuple[float | None, float | None]:
44 """Determines the hottest hot Pinch Temperature and coldest cold Pinch Temperature and return both values."""
45 h_loc, c_loc, valid = get_pinch_loc(pt, col_H)
46 if valid: 46 ↛ 49line 46 didn't jump to line 49 because the condition on line 46 was always true
47 return pt.loc[h_loc, col_T], pt.loc[c_loc, col_T]
48 else:
49 return None, None
52def shift_heat_cascade(pt: ProblemTable, dh: float, col: Union[int, str, Enum]) -> ProblemTable:
53 """Shifts a column in a heat cascade DataFrame by dH."""
54 pt.col[col] += dh
55 return pt.copy
58def insert_temperature_interval_into_pt(pt: ProblemTable, T_ls: List[float] | float) -> Tuple[ProblemTable, int]:
59 """Efficient insert into a ProblemTable assuming strictly descending T column."""
60 if isinstance(T_ls, float):
61 T_ls = [T_ls]
63 for T_new in T_ls:
64 col = pt.col_index
65 T_col = pt.data[:, col[PT.T.value]]
67 # Vectorized scan for insert index
68 insert_index = None
69 for i in range(1, len(T_col)): 69 ↛ 74line 69 didn't jump to line 74 because the loop on line 69 didn't complete
70 if T_col[i - 1] - ZERO > T_new > T_col[i] + ZERO:
71 insert_index = i
72 break
74 if insert_index is None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 return pt, 0 # already exists
77 row_top = pt.data[insert_index - 1]
78 row_bot = pt.data[insert_index]
80 cp_hot = row_bot[col[PT.CP_HOT.value]]
81 cp_cold = row_bot[col[PT.CP_COLD.value]]
82 mcp_net = row_bot[col[PT.MCP_NET.value]]
84 delta_above = row_top[col[PT.T.value]] - T_new
85 delta_below = T_new - row_bot[col[PT.T.value]]
87 row_dict = {
88 PT.T.value: T_new,
89 PT.DELTA_T.value: delta_above,
90 PT.CP_HOT.value: cp_hot,
91 PT.DELTA_H_HOT.value: delta_above * cp_hot,
92 PT.CP_COLD.value: cp_cold,
93 PT.DELTA_H_COLD.value: delta_above * cp_cold,
94 PT.MCP_NET.value: mcp_net,
95 PT.DELTA_H_NET.value: delta_above * mcp_net,
96 }
98 icol_T = col[PT.T.value]
99 for key in [
100 PT.H_HOT.value, PT.H_COLD.value, PT.H_NET.value,
101 PT.H_NET_NP.value, PT.H_NET_A.value, PT.H_NET_V.value,
102 ]:
103 i = col[key]
104 if not np.isnan(row_bot[i]):
105 row_dict[key] = linear_interpolation(
106 T_new,
107 row_bot[icol_T],
108 row_top[icol_T],
109 row_bot[i],
110 row_top[i]
111 )
113 # Insert and update next row
114 pt.insert(row_dict, insert_index)
116 pt.data[insert_index + 1, col[PT.DELTA_T.value]] = delta_below
117 pt.data[insert_index + 1, col[PT.DELTA_H_HOT.value]] = delta_below * cp_hot
118 pt.data[insert_index + 1, col[PT.DELTA_H_COLD.value]] = delta_below * cp_cold
119 pt.data[insert_index + 1, col[PT.DELTA_H_NET.value]] = delta_below * mcp_net
121 return pt, 1
124def key_name(zone_name: str, target_type: str = TargetType.DI.value):
125 return f"{zone_name}/{target_type}"
128def get_value(val: Union[float, dict, ValueWithUnit]) -> float:
129 if isinstance(val, float):
130 return val
131 elif isinstance(val, dict): 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 return val['value']
133 elif isinstance(val, ValueWithUnit): 133 ↛ 136line 133 didn't jump to line 136 because the condition on line 133 was always true
134 return val.value
135 else:
136 raise TypeError(f"Unsupported type: {type(val)}. Expected float, dict, or ValueWithUnit.")
139def find_LMTD(T_hot_in: float, T_hot_out: float, T_cold_in: float, T_cold_out: float) -> float:
140 """Returns the log mean temperature difference (LMTD) for a counterflow heat exchanger."""
141 # Check temperature directions for counter-current assumption
142 if T_hot_in < T_hot_out:
143 raise ValueError("Hot fluid must cool down (T_hot_in > T_hot_out)")
144 if T_cold_out < T_cold_in:
145 raise ValueError("Cold fluid must heat up (T_cold_out > T_cold_in)")
147 delta_T1 = T_hot_in - T_cold_out # Inlet diff (hottest hot - hottest cold)
148 delta_T2 = T_hot_out - T_cold_in # Outlet diff (coldest hot - coldest cold)
150 if delta_T1 <= 0 or delta_T2 <= 0:
151 raise ValueError(f"Invalid temperature differences: ΔT1={delta_T1}, ΔT2={delta_T2}")
153 if math.isclose(delta_T1, delta_T2, rel_tol=1e-6):
154 return delta_T1 # or delta_T2 — they're equal
156 return (delta_T1 - delta_T2) / math.log(delta_T1 / delta_T2)
159def capital_recovery_factor(interest_rate: float, years: int) -> float:
160 """Calculates the Capital Recovery Factor (CRF), also known as the annualisation factor."""
161 i = interest_rate
162 n = years
163 return i * (1 + i) ** n / ((1 + i) ** n - 1)
166def compute_exergetic_temperature(T: float, T_ref_in_C: float = 15.0, units_of_T: str = "C") -> float:
167 """Calculate the exergetic temperature difference relative to T_ref (in °C or K)."""
168 # Marmolejo-Correa, D., Gundersen, T., 2013. New Graphical Representation of Exergy Applied to Low Temperature Process Design.
169 # Industrial & Engineering Chemistry Research 52, 7145–7156. https://doi.org/10.1021/ie302541e
170 if units_of_T not in ("C", "K"):
171 raise ValueError("units must be either 'C' or 'K'")
173 T_amb = T_ref_in_C + C_to_K # Convert reference to Kelvin
174 T_K = T + C_to_K if units_of_T == "C" else T
176 if T_K <= 0:
177 raise ValueError("Absolute temperature must be > 0 K")
179 ratio = T_K / T_amb
180 return T_amb * (ratio - 1 - math.log(ratio))
183def linear_interpolation(x: float, x1: float, x2: float, y1: float, y2: float) -> float:
184 """Performs linear interpolation to estimate y at a given x, using two known points (x1, y1) and (x2, y2)."""
185 if x1 == x2: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 raise ValueError("Cannot perform interpolation when x1 == x2 (undefined slope).")
187 m = (y1 - y2) / (x1 - x2)
188 c = y1 - m * x1
189 return m * x + c