Coverage for backend/pinch_service/linearization/linearize_stream.py: 69%

222 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from typing import List, Tuple 

2import numpy as np 

3from scipy.optimize import NonlinearConstraint, minimize 

4import matplotlib.pyplot as plt 

5import CoolProp.CoolProp as CP 

6# from .state_evaluation import StateEvaluation 

7 

8def serialize_states(s): 

9 if not isinstance(s, StateEvaluation): 9 ↛ 10line 9 didn't jump to line 10 because the condition on line 9 was never true

10 return {} 

11 return { 

12 "pressure": s.get_pressure(), 

13 "enthalpy": s.get_molar_enthalpy(), 

14 "temperature": s.get_temperature(), 

15 "enthalpy_mass": s.get_mass_enthalpy(), 

16 "entropy": s.get_molar_entropy(), 

17 "entropy_mass": s.get_mass_entropy(), 

18 "quality": s.get_vapour_fraction(), 

19 "total_energy_flow": s.get_total_energy_flow(), 

20 "relative_humidity": s.get_relative_humidity(), 

21 "mass_flow": s.get_mass_flow(), 

22 # "specific_volume": s.get_specific_volume(), 

23 # "density": s.get_density(), 

24 "volumetric_flow": s.get_volumetric_flow(), 

25 # "molecular_weight": s.get_molecular_weight(), 

26 # "molecular_weight_comp": s.get_component_molecular_weight(), 

27 } 

28 

29def generate_t_h_curve(ppKey: str, composition: list[tuple[str, float]], mole_flow: float, t_supply: float = None, t_target: float = None, p_supply: float = None, p_target: float = None, h_supply: float = None, h_target: float = None, num_points: int = 100): 

30 """ 

31 Generates a set of x, y points for a TH curve. 

32 :param: t_supply - Initial temperature of stream (K) 

33 :param: t_target - Final temperature of stream (K) 

34 :param: p_supply - Initial pressure of stream (Pa) 

35 :param: p_target - Final pressure of stream (Pa) 

36 :param: h_supply - Initial molar enthalpy of stream (J/kg) 

37 :param: h_target - Final molar enthalpy of stream (J/kg)  

38 :param: composition - list of Tuple(compound name, molar fraction) 

39 :param: num_points - Number of intervals to evaluate stream at 

40 :returns: Tuple(Heat Flow (J/kg), Temperatures (K)) 

41 """ 

42 # Generate temperature, pressure and enthalpy points 

43 pressures = np.linspace(p_supply, p_target, num_points) 

44 temperatures = None 

45 if ppKey == "peng-robinson": 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 h_supply = None 

47 if h_supply is not None: 

48 molar_enthalpies = np.linspace(h_supply, h_target, num_points) 

49 else: 

50 temperatures = np.linspace(t_supply, t_target, num_points) 

51 

52 heat_flows = np.zeros(num_points) 

53 state = None 

54 

55 # Evaluate total energy (heat) flow or temperature depending on the selected independent variable 

56 try: 

57 if ppKey in ["humid_air", "milk", "helmholtz", "peng-robinson"]: 

58 if h_supply is not None: 58 ↛ 62line 58 didn't jump to line 62 because the condition on line 58 was always true

59 state = StateEvaluation(ppKey, composition, mole_flow=mole_flow, pressures=pressures, molar_enthalpies=molar_enthalpies) 

60 temperatures = state.get_temperature() 

61 else: 

62 state = StateEvaluation(ppKey, composition, mole_flow=mole_flow, pressures=pressures, temperatures=temperatures) 

63 

64 heat_flows = state.get_total_energy_flow() 

65 else: 

66 mixture_string = "HEOS::" + "&".join([f"{comp}[{frac}]" for comp, frac in composition]) 

67 if temperatures is not None: 67 ↛ 73line 67 didn't jump to line 73 because the condition on line 67 was always true

68 molar_enthalpies = np.zeros(num_points) 

69 for i in range(len(temperatures)): 

70 molar_enthalpies[i] = CP.PropsSI('HMOLAR', 'P', pressures[i], 'T', temperatures[i], mixture_string) 

71 heat_flows[i] = molar_enthalpies[i] * mole_flow 

72 else: 

73 temperatures = np.zeros(len(molar_enthalpies)) 

74 for i in range(len(molar_enthalpies)): 

75 temperatures[i] = CP.PropsSI('T', 'P', pressures[i], 'HMOLAR', molar_enthalpies[i], mixture_string) 

76 heat_flows[i] = molar_enthalpies[i] * mole_flow 

77 except: 

78 raise ValueError("Warning: Failed to compute enthalpy for T-h curve.") 

79 

80 return np.column_stack((heat_flows, temperatures)), state 

81 

82def rdp(curve: np.array, epsilon: float) -> np.array: 

83 """ 

84 Linearize and simplify a curve using the Ramer-Douglas-Peucker (RDP) algorithm. 

85 

86 :param curve: Array of points (N, 2). 

87 :param epsilon: Maximum allowed perpendicular distance for simplification. Can be considered the deviation tolerance or (t_min) 

88 :returns: Simplified array of points. 

89 """ 

90 n = len(curve) 

91 indices = np.ones(n, dtype=bool) 

92 stack = [[0, n - 1]] 

93 

94 while stack: 

95 start, end = stack.pop() 

96 dmax = 0.0 

97 index = start 

98 line_vector = curve[end] - curve[start] 

99 line_length = np.linalg.norm(line_vector) 

100 

101 if line_length == 0: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 continue 

103 

104 # Get point with max distance from line 

105 for i in range(start + 1, end): 

106 point_vector = curve[i] - curve[start] 

107 distance = abs(np.cross(line_vector, point_vector)) / line_length 

108 if distance > dmax: 

109 dmax = distance 

110 index = i 

111 

112 # Split if distance > epsilon (t_min)  

113 if dmax > epsilon: 

114 stack.append([start, index]) 

115 stack.append([index, end]) 

116 else: 

117 indices[start + 1:end] = False 

118 

119 return curve[indices] 

120 

121def refine_pw_points_for_heating_or_cooling(curve: np.array, pw_points: np.array, eps_lb: float=0.0, hot_stream: bool=True) -> np.array: 

122 """ 

123 Refines a piecewise linear approximation of a T-h profile to ensure feasibility as a hot or cold stream within eps. 

124 The refinement is done by optimising the piecewise points to minimise the L2 norm of the difference between the 

125 data and the piecewise points. 

126 

127 :param curve: Array of points (N, 2). 

128 :param eps_lb: Maximum allowed hot or cold stream violation. 

129 :param hot_stream: True if the stream is hot, False if the stream is cold. 

130 :returns: Simplified array of points, maximum error.  

131 """ 

132 # Ensure the data is a numpy array 

133 curve = np.flipud(curve) 

134 pw_points = np.flipud(pw_points) 

135 

136 # Remove the first and last points because they are fixed 

137 # Convert 2d array to 1d array 

138 x0 = pw_points[1:-1].flatten() 

139 

140 # Get arguments for the optimisation 

141 args = { 

142 'first_point' : pw_points[0], 

143 'last_point' : pw_points[-1], 

144 } 

145 

146 def delta_pw_and_data(x, args): 

147 '''Returns the difference between the data and the piecewise points''' 

148 # Reshape the piecewise points such that the first half are the x values and the second half are the y values 

149 new_pw_points = np.vstack((args['first_point'], x.reshape(-1, 2), args['last_point'])) 

150 # Interpolate the piecewise points to the original data points 

151 int_points = np.interp(curve[:, 0], new_pw_points[:, 0], new_pw_points[:, 1]) 

152 # Find the difference between the data and the piecewise points 

153 return int_points - curve[:, 1] 

154 

155 # Define the constraints for the optimisation 

156 if hot_stream: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 con = lambda x: np.max(delta_pw_and_data(x, args)) 

158 nlc = NonlinearConstraint(con, -np.inf, eps_lb) 

159 else: 

160 con = lambda x: np.min(delta_pw_and_data(x, args)) 

161 nlc = NonlinearConstraint(con, -eps_lb, np.inf) 

162 

163 def obj(x, args): 

164 '''Returns the L2 norm of the difference between the data and the piecewise points''' 

165 return np.sum( 

166 np.square( 

167 delta_pw_and_data(x, args) 

168 ) 

169 ) 

170 

171 # Perform the optimisation 

172 res = minimize(fun=obj, x0=x0, constraints=nlc, args=args, method='SLSQP', tol=1e-6) 

173 refined_pw_points = np.vstack((args['first_point'], res.x.reshape(-1, 2), args['last_point'])) 

174 return np.flipud(refined_pw_points), np.max(np.abs(delta_pw_and_data(res.x, args))) 

175 

176def get_piecewise_breakpoints(curve: np.array, epsilon: float, hot_stream: bool=True) -> np.array: 

177 """ 

178 Get the piecewise breakpoints for a curve using the Ramer-Douglas-Peucker (RDP) algorithm followed  

179 by an optimisation-based refinement to ensure hot and cold stream integrity for pinch analysis. 

180 

181 :param curve: Array of points (N, 2). 

182 :param epsilon: Maximum allowed perpendicular distance for simplification; the deviation tolerance or delta_t_dev. 

183 :param hot_stream: True if the stream is hot, False if the stream is cold. 

184 :returns: Simplified array of breakpoints that define the piecewise linearisation. 

185 """ 

186 for _ in range(10): 

187 pw_points = rdp(curve, epsilon=epsilon) 

188 

189 if len(pw_points) > 2: 

190 pw_points, max_err = refine_pw_points_for_heating_or_cooling(curve, pw_points, epsilon / 10, hot_stream) 

191 else: 

192 break 

193 

194 if max_err > epsilon: 194 ↛ 197line 194 didn't jump to line 197 because the condition on line 194 was always true

195 epsilon = epsilon * 0.9 

196 else: 

197 break 

198 

199 return pw_points 

200 

201def piecewise_curve(curve: list, epsilon: float, hot_stream:bool) -> np.array: 

202 """ 

203 Performs piecewise linearisation on a curve using the Ramer-Douglas-Peucker (RDP) algorithm. 

204 

205 :param curve: Numpy array of plot points for th curve 

206 :param epsilon: Maximum allowed temperature differential (t_min, tolerance etc) 

207 :returns: Numpy array of new curve points 

208 """ 

209 try: 

210 curve = np.array(curve) 

211 masked_values = get_piecewise_breakpoints(curve, epsilon=epsilon, hot_stream=hot_stream) 

212 except: 

213 masked_values = rdp(curve, epsilon=epsilon) 

214 return masked_values 

215 

216def plot_t_h_curve(points, title: str = "Temperature vs. Enthalpy") -> None: 

217 """ 

218 Plot Temperature vs. Enthalpy. 

219 :param points: tuple with columns 'Temperature (K)' and 'Enthalpy (kJ/mol)'. 

220 :param title: Title of the graph. 

221 :returns: None 

222 """ 

223 plt.figure(figsize=(10, 6)) 

224 plt.plot(points[:, 0], points[:, 1], marker="o", linestyle="-") ## not indexing tuppple correctly 

225 plt.title(title, fontsize=16) 

226 plt.ylabel("Temperature (K)", fontsize=14) 

227 plt.xlabel("Enthalpy (kJ/mol)", fontsize=14) 

228 plt.grid(True, linestyle="--", alpha=0.7) 

229 plt.xticks(fontsize=12) 

230 plt.yticks(fontsize=12) 

231 plt.tight_layout() 

232 

233def plot_t_h_curve_with_piecewise_and_bounds(points: np.array, piecewise_points: np.array, epsilon: float, title: str = "Temperature vs. Enthalpy") -> None: 

234 """ 

235 Plot the TH curve, its piecewise linearization, and a shaded region ±epsilon around the TH curve. 

236 :param points: Original TH curve points. 

237 :param piecewise_points: Simplified piecewise linear curve points. 

238 :param epsilon: Epsilon value for shading. 

239 :param title: Title of the graph. 

240 """ 

241 enthalpies, temperatures = points[:, 0], points[:, 1] 

242 upper_bound = [e + epsilon for e in temperatures] 

243 lower_bound = [e - epsilon for e in temperatures] 

244 

245 plt.figure(figsize=(10, 6)) 

246 plt.plot(enthalpies, temperatures, label="TH Curve", color="red", linewidth=1.5) 

247 plt.plot(piecewise_points[:, 0], piecewise_points[:, 1], label="Piecewise Curve", color="blue", linestyle="--", marker="o", linewidth=2) ## pwl curve should be dashed and blue 

248 plt.fill_between(enthalpies, lower_bound, upper_bound, color="lightblue", alpha=0.3, label=f"±{epsilon} Bounds") 

249 

250 plt.title(title, fontsize=16) 

251 plt.xlabel("Enthalpy (J/mol)", fontsize=14) 

252 plt.ylabel("Temperature (K)", fontsize=14) 

253 plt.legend(fontsize=12) 

254 plt.grid(True, linestyle="--", alpha=0.7) 

255 plt.tight_layout() 

256 plt.show() 

257 

258 

259 

260 

261from property_packages.build_package import build_package 

262from pyomo.environ import value, ConcreteModel, RangeSet 

263from idaes.core import FlowsheetBlock 

264from idaes.core.solvers import get_solver 

265 

266def check_DOF(ls): 

267 return 2 - sum([1 for x in ls if x is not None]) 

268 

269def check_state_lengths(ls): 

270 l = [len(x) if x is not None else 0 for x in ls ] 

271 l.sort() 

272 return l[-1] == l[-2] 

273 

274class StateEvaluation: 

275 m = ConcreteModel() 

276 m.fs = FlowsheetBlock(dynamic=False) 

277 solver = get_solver("ipopt") 

278 

279 def __init__(self, ppKey: str, composition, mole_flow: float, 

280 pressures: list = None, temperatures: list = None, 

281 molar_enthalpies: list = None, molar_entropies: list = None) -> None: 

282 # Build property package 

283 self.m.fs.properties = build_package(ppKey, [comp for comp, _ in composition]) 

284 var = [pressures, temperatures, molar_enthalpies, molar_entropies] 

285 if check_DOF(var) > 0: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 raise ValueError("Insufficient state variables provided.") 

287 elif check_DOF(var) < 0: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 raise ValueError("Too many state variables provided.") 

289 if check_state_lengths(var) == False: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 raise ValueError("Number of states in the different variables must match.") 

291 

292 n_states = max([len(x) if x is not None else 0 for x in var ]) 

293 self.m.fs.state_index = RangeSet(0, n_states - 1) 

294 

295 # Build multiple state blocks 

296 self.m.fs.sb = self.m.fs.properties.build_state_block( 

297 self.m.fs.state_index, defined_state=True 

298 ) 

299 

300 # Fix composition for each state block 

301 for i in self.m.fs.state_index: 

302 if len(composition) > 1: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 for comp, frac in composition: 

304 self.m.fs.sb[i].mole_frac_comp[comp].fix(frac) 

305 self.m.fs.sb[i].flow_mol.fix(mole_flow) 

306 

307 if pressures is not None: 307 ↛ 309line 307 didn't jump to line 309 because the condition on line 307 was always true

308 self.m.fs.sb[i].pressure.fix(pressures[i]) 

309 if temperatures is not None: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 self.m.fs.sb[i].temperature.fix(temperatures[i]) 

311 if ppKey == "milk": 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 if molar_enthalpies is not None: 

313 self.m.fs.sb[i].constrain("enth_mol", molar_enthalpies[i]) 

314 if molar_entropies is not None: 

315 self.m.fs.sb[i].constrain("entr_mol", molar_entropies[i]) 

316 else: 

317 if molar_enthalpies is not None: 317 ↛ 319line 317 didn't jump to line 319 because the condition on line 317 was always true

318 self.m.fs.sb[i].enth_mol.fix(molar_enthalpies[i]) 

319 if molar_entropies is not None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 self.m.fs.sb[i].entr_mol.fix(molar_entropies[i]) 

321 

322 # Initialize and solve all state blocks 

323 self.m.fs.sb.initialize() 

324 try: 

325 self.solver.solve(self.m, tee=False) 

326 except ValueError as e: 

327 if str(e) == "No variables appear in the Pyomo model constraints or objective. This is not supported by the NL file interface": 327 ↛ 330line 327 didn't jump to line 330 because the condition on line 327 was always true

328 print("No need to solve for a solution. Pass") 

329 else: 

330 print(f"Solver failed with error: {e}") 

331 raise 

332 

333 def get_property(self, prop, state_index: int = None): 

334 try: 

335 if state_index is None: 335 ↛ 338line 335 didn't jump to line 338 because the condition on line 335 was always true

336 return self.get_all_properties(prop) 

337 else: 

338 return self.get_single_property(prop, state_index) 

339 except AttributeError: 

340 print(f"Property '{prop}' not found in state block.") 

341 return None 

342 

343 def get_single_property(self, prop: str, state_index: int = 0): 

344 return [value(getattr(self.m.fs.sb[state_index], prop))] 

345 

346 def get_all_properties(self, prop: str): 

347 return [value(getattr(self.m.fs.sb[i], prop)) for i in self.m.fs.state_index.ordered_data()] 

348 

349 def get_pressure(self, state_index: int = None): 

350 return self.get_property('pressure', state_index) 

351 

352 def get_molar_enthalpy(self, state_index: int = None): 

353 return self.get_property('enth_mol', state_index) 

354 

355 def get_mass_enthalpy(self, state_index: int = None): 

356 return self.get_property('enth_mass', state_index) 

357 

358 def get_temperature(self, state_index: int = None): 

359 return self.get_property('temperature', state_index) 

360 

361 def get_molar_entropy(self, state_index: int = None): 

362 return self.get_property('entr_mol', state_index) 

363 

364 def get_mass_entropy(self, state_index: int = None): 

365 return self.get_property('entr_mass', state_index) 

366 

367 def get_vapour_fraction(self, state_index: int = None): 

368 return self.get_property('vap_frac', state_index) 

369 

370 def get_total_energy_flow(self, state_index: int = None): 

371 return self.get_property('total_energy_flow', state_index) 

372 

373 def get_relative_humidity(self, state_index: int = None): 

374 return self.get_property('relative_humidity', state_index) 

375 

376 def get_mass_flow(self, state_index: int = None): 

377 return self.get_property('flow_mass', state_index) 

378 

379 def get_specific_volume(self, state_index: int = None): 

380 return self.get_volumetric_flow(state_index) / self.get_mass_flow(state_index) 

381 

382 def get_density(self, state_index: int = None): 

383 return self.get_mass_flow(state_index) / self.get_volumetric_flow(state_index) 

384 

385 def get_volumetric_flow(self, state_index: int = None): 

386 return self.get_property('flow_vol', state_index) 

387 

388 def get_molecular_weight(self): 

389 mw_total = 0.0 

390 for comp in self.m.fs.sb[0].mole_frac_comp: 

391 mw = value(self.m.fs.sb[0].params.mw_comp[comp]) 

392 x = value(self.m.fs.sb[0].mole_frac_comp[comp]) 

393 mw_total += mw * x 

394 return mw_total 

395 

396 def get_component_molecular_weight(self, comp): 

397 return value(self.m.fs.sb[0].params.mw_comp[comp])