Coverage for backend/pinch_service/OpenPinch/src/analysis/support_methods.py: 68%

107 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from __future__ import annotations 

2import numpy as np 

3import math 

4from typing import List, Tuple, Union 

5from ..lib import * 

6from ..classes.problem_table import ProblemTable 

7 

8 

9def get_pinch_loc(pt: ProblemTable, col=PT.H_NET.value) -> tuple[int, int, bool]: 

10 """Returns the row indices of the Hot and Cold Pinch Temperatures.""" 

11 # Pull out the 1-D profile as a NumPy array 

12 h_net = np.asarray(pt.col[col]) 

13 n = h_net.size 

14 

15 abs_arr = np.abs(h_net) 

16 zeros_mask = abs_arr < ZERO 

17 

18 if np.all(zeros_mask) == False and np.any(zeros_mask): 

19 # ---------- Hot-pinch (scan top-down) ---------- 

20 first_zero = np.flatnonzero(zeros_mask)[0] 

21 if first_zero > 0: # zero not on the first row 

22 row_h = first_zero 

23 else: # zero *is* the first row 

24 nz_after = np.flatnonzero(~zeros_mask) # first genuinely non-zero row 

25 row_h = nz_after[0] - 1 if nz_after.size else n - 1 

26 

27 # ---------- Cold-pinch (scan bottom-up) ---------- 

28 last_zero = np.flatnonzero(zeros_mask)[-1] 

29 if last_zero < n - 1: # zero not on the last row 

30 row_c = last_zero 

31 else: # zero *is* the last row 

32 # find first non-zero when walking *up* from the bottom 

33 nz_before_rev = np.flatnonzero(~zeros_mask[::-1]) 

34 row_c = n - nz_before_rev[0] if nz_before_rev.size else 0 

35 else: 

36 row_h = n - 1 

37 row_c = 0 

38 

39 valid = row_h <= row_c 

40 return row_h, row_c, valid 

41 

42 

43def get_pinch_temperatures(pt: ProblemTable, col_T: str =PT.T.value, col_H=PT.H_NET.value) -> tuple[float | None, float | None]: 

44 """Determines the hottest hot Pinch Temperature and coldest cold Pinch Temperature and return both values.""" 

45 h_loc, c_loc, valid = get_pinch_loc(pt, col_H) 

46 if valid: 46 ↛ 49line 46 didn't jump to line 49 because the condition on line 46 was always true

47 return pt.loc[h_loc, col_T], pt.loc[c_loc, col_T] 

48 else: 

49 return None, None 

50 

51 

52def shift_heat_cascade(pt: ProblemTable, dh: float, col: Union[int, str, Enum]) -> ProblemTable: 

53 """Shifts a column in a heat cascade DataFrame by dH.""" 

54 pt.col[col] += dh 

55 return pt.copy 

56 

57 

58def insert_temperature_interval_into_pt(pt: ProblemTable, T_ls: List[float] | float) -> Tuple[ProblemTable, int]: 

59 """Efficient insert into a ProblemTable assuming strictly descending T column.""" 

60 if isinstance(T_ls, float): 

61 T_ls = [T_ls] 

62 

63 for T_new in T_ls: 

64 col = pt.col_index 

65 T_col = pt.data[:, col[PT.T.value]] 

66 

67 # Vectorized scan for insert index 

68 insert_index = None 

69 for i in range(1, len(T_col)): 69 ↛ 74line 69 didn't jump to line 74 because the loop on line 69 didn't complete

70 if T_col[i - 1] - ZERO > T_new > T_col[i] + ZERO: 

71 insert_index = i 

72 break 

73 

74 if insert_index is None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 return pt, 0 # already exists 

76 

77 row_top = pt.data[insert_index - 1] 

78 row_bot = pt.data[insert_index] 

79 

80 cp_hot = row_bot[col[PT.CP_HOT.value]] 

81 cp_cold = row_bot[col[PT.CP_COLD.value]] 

82 mcp_net = row_bot[col[PT.MCP_NET.value]] 

83 

84 delta_above = row_top[col[PT.T.value]] - T_new 

85 delta_below = T_new - row_bot[col[PT.T.value]] 

86 

87 row_dict = { 

88 PT.T.value: T_new, 

89 PT.DELTA_T.value: delta_above, 

90 PT.CP_HOT.value: cp_hot, 

91 PT.DELTA_H_HOT.value: delta_above * cp_hot, 

92 PT.CP_COLD.value: cp_cold, 

93 PT.DELTA_H_COLD.value: delta_above * cp_cold, 

94 PT.MCP_NET.value: mcp_net, 

95 PT.DELTA_H_NET.value: delta_above * mcp_net, 

96 } 

97 

98 icol_T = col[PT.T.value] 

99 for key in [ 

100 PT.H_HOT.value, PT.H_COLD.value, PT.H_NET.value, 

101 PT.H_NET_NP.value, PT.H_NET_A.value, PT.H_NET_V.value, 

102 ]: 

103 i = col[key] 

104 if not np.isnan(row_bot[i]): 

105 row_dict[key] = linear_interpolation( 

106 T_new, 

107 row_bot[icol_T], 

108 row_top[icol_T], 

109 row_bot[i], 

110 row_top[i] 

111 ) 

112 

113 # Insert and update next row 

114 pt.insert(row_dict, insert_index) 

115 

116 pt.data[insert_index + 1, col[PT.DELTA_T.value]] = delta_below 

117 pt.data[insert_index + 1, col[PT.DELTA_H_HOT.value]] = delta_below * cp_hot 

118 pt.data[insert_index + 1, col[PT.DELTA_H_COLD.value]] = delta_below * cp_cold 

119 pt.data[insert_index + 1, col[PT.DELTA_H_NET.value]] = delta_below * mcp_net 

120 

121 return pt, 1 

122 

123 

124def key_name(zone_name: str, target_type: str = TargetType.DI.value): 

125 return f"{zone_name}/{target_type}" 

126 

127 

128def get_value(val: Union[float, dict, ValueWithUnit]) -> float: 

129 if isinstance(val, float): 

130 return val 

131 elif isinstance(val, dict): 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 return val['value'] 

133 elif isinstance(val, ValueWithUnit): 133 ↛ 136line 133 didn't jump to line 136 because the condition on line 133 was always true

134 return val.value 

135 else: 

136 raise TypeError(f"Unsupported type: {type(val)}. Expected float, dict, or ValueWithUnit.") 

137 

138 

139def find_LMTD(T_hot_in: float, T_hot_out: float, T_cold_in: float, T_cold_out: float) -> float: 

140 """Returns the log mean temperature difference (LMTD) for a counterflow heat exchanger.""" 

141 # Check temperature directions for counter-current assumption 

142 if T_hot_in < T_hot_out: 

143 raise ValueError("Hot fluid must cool down (T_hot_in > T_hot_out)") 

144 if T_cold_out < T_cold_in: 

145 raise ValueError("Cold fluid must heat up (T_cold_out > T_cold_in)") 

146 

147 delta_T1 = T_hot_in - T_cold_out # Inlet diff (hottest hot - hottest cold) 

148 delta_T2 = T_hot_out - T_cold_in # Outlet diff (coldest hot - coldest cold) 

149 

150 if delta_T1 <= 0 or delta_T2 <= 0: 

151 raise ValueError(f"Invalid temperature differences: ΔT1={delta_T1}, ΔT2={delta_T2}") 

152 

153 if math.isclose(delta_T1, delta_T2, rel_tol=1e-6): 

154 return delta_T1 # or delta_T2 — they're equal 

155 

156 return (delta_T1 - delta_T2) / math.log(delta_T1 / delta_T2) 

157 

158 

159def capital_recovery_factor(interest_rate: float, years: int) -> float: 

160 """Calculates the Capital Recovery Factor (CRF), also known as the annualisation factor.""" 

161 i = interest_rate 

162 n = years 

163 return i * (1 + i) ** n / ((1 + i) ** n - 1) 

164 

165 

166def compute_exergetic_temperature(T: float, T_ref_in_C: float = 15.0, units_of_T: str = "C") -> float: 

167 """Calculate the exergetic temperature difference relative to T_ref (in °C or K).""" 

168 # Marmolejo-Correa, D., Gundersen, T., 2013. New Graphical Representation of Exergy Applied to Low Temperature Process Design.  

169 # Industrial & Engineering Chemistry Research 52, 7145–7156. https://doi.org/10.1021/ie302541e 

170 if units_of_T not in ("C", "K"): 

171 raise ValueError("units must be either 'C' or 'K'") 

172 

173 T_amb = T_ref_in_C + C_to_K # Convert reference to Kelvin 

174 T_K = T + C_to_K if units_of_T == "C" else T 

175 

176 if T_K <= 0: 

177 raise ValueError("Absolute temperature must be > 0 K") 

178 

179 ratio = T_K / T_amb 

180 return T_amb * (ratio - 1 - math.log(ratio)) 

181 

182 

183def linear_interpolation(x: float, x1: float, x2: float, y1: float, y2: float) -> float: 

184 """Performs linear interpolation to estimate y at a given x, using two known points (x1, y1) and (x2, y2).""" 

185 if x1 == x2: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 raise ValueError("Cannot perform interpolation when x1 == x2 (undefined slope).") 

187 m = (y1 - y2) / (x1 - x2) 

188 c = y1 - m * x1 

189 return m * x + c