Coverage for backend/pinch_service/linearization/linearize_stream.py: 69%
222 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from typing import List, Tuple
2import numpy as np
3from scipy.optimize import NonlinearConstraint, minimize
4import matplotlib.pyplot as plt
5import CoolProp.CoolProp as CP
6# from .state_evaluation import StateEvaluation
8def serialize_states(s):
9 if not isinstance(s, StateEvaluation): 9 ↛ 10line 9 didn't jump to line 10 because the condition on line 9 was never true
10 return {}
11 return {
12 "pressure": s.get_pressure(),
13 "enthalpy": s.get_molar_enthalpy(),
14 "temperature": s.get_temperature(),
15 "enthalpy_mass": s.get_mass_enthalpy(),
16 "entropy": s.get_molar_entropy(),
17 "entropy_mass": s.get_mass_entropy(),
18 "quality": s.get_vapour_fraction(),
19 "total_energy_flow": s.get_total_energy_flow(),
20 "relative_humidity": s.get_relative_humidity(),
21 "mass_flow": s.get_mass_flow(),
22 # "specific_volume": s.get_specific_volume(),
23 # "density": s.get_density(),
24 "volumetric_flow": s.get_volumetric_flow(),
25 # "molecular_weight": s.get_molecular_weight(),
26 # "molecular_weight_comp": s.get_component_molecular_weight(),
27 }
29def generate_t_h_curve(ppKey: str, composition: list[tuple[str, float]], mole_flow: float, t_supply: float = None, t_target: float = None, p_supply: float = None, p_target: float = None, h_supply: float = None, h_target: float = None, num_points: int = 100):
30 """
31 Generates a set of x, y points for a TH curve.
32 :param: t_supply - Initial temperature of stream (K)
33 :param: t_target - Final temperature of stream (K)
34 :param: p_supply - Initial pressure of stream (Pa)
35 :param: p_target - Final pressure of stream (Pa)
36 :param: h_supply - Initial molar enthalpy of stream (J/kg)
37 :param: h_target - Final molar enthalpy of stream (J/kg)
38 :param: composition - list of Tuple(compound name, molar fraction)
39 :param: num_points - Number of intervals to evaluate stream at
40 :returns: Tuple(Heat Flow (J/kg), Temperatures (K))
41 """
42 # Generate temperature, pressure and enthalpy points
43 pressures = np.linspace(p_supply, p_target, num_points)
44 temperatures = None
45 if ppKey == "peng-robinson": 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 h_supply = None
47 if h_supply is not None:
48 molar_enthalpies = np.linspace(h_supply, h_target, num_points)
49 else:
50 temperatures = np.linspace(t_supply, t_target, num_points)
52 heat_flows = np.zeros(num_points)
53 state = None
55 # Evaluate total energy (heat) flow or temperature depending on the selected independent variable
56 try:
57 if ppKey in ["humid_air", "milk", "helmholtz", "peng-robinson"]:
58 if h_supply is not None: 58 ↛ 62line 58 didn't jump to line 62 because the condition on line 58 was always true
59 state = StateEvaluation(ppKey, composition, mole_flow=mole_flow, pressures=pressures, molar_enthalpies=molar_enthalpies)
60 temperatures = state.get_temperature()
61 else:
62 state = StateEvaluation(ppKey, composition, mole_flow=mole_flow, pressures=pressures, temperatures=temperatures)
64 heat_flows = state.get_total_energy_flow()
65 else:
66 mixture_string = "HEOS::" + "&".join([f"{comp}[{frac}]" for comp, frac in composition])
67 if temperatures is not None: 67 ↛ 73line 67 didn't jump to line 73 because the condition on line 67 was always true
68 molar_enthalpies = np.zeros(num_points)
69 for i in range(len(temperatures)):
70 molar_enthalpies[i] = CP.PropsSI('HMOLAR', 'P', pressures[i], 'T', temperatures[i], mixture_string)
71 heat_flows[i] = molar_enthalpies[i] * mole_flow
72 else:
73 temperatures = np.zeros(len(molar_enthalpies))
74 for i in range(len(molar_enthalpies)):
75 temperatures[i] = CP.PropsSI('T', 'P', pressures[i], 'HMOLAR', molar_enthalpies[i], mixture_string)
76 heat_flows[i] = molar_enthalpies[i] * mole_flow
77 except:
78 raise ValueError("Warning: Failed to compute enthalpy for T-h curve.")
80 return np.column_stack((heat_flows, temperatures)), state
82def rdp(curve: np.array, epsilon: float) -> np.array:
83 """
84 Linearize and simplify a curve using the Ramer-Douglas-Peucker (RDP) algorithm.
86 :param curve: Array of points (N, 2).
87 :param epsilon: Maximum allowed perpendicular distance for simplification. Can be considered the deviation tolerance or (t_min)
88 :returns: Simplified array of points.
89 """
90 n = len(curve)
91 indices = np.ones(n, dtype=bool)
92 stack = [[0, n - 1]]
94 while stack:
95 start, end = stack.pop()
96 dmax = 0.0
97 index = start
98 line_vector = curve[end] - curve[start]
99 line_length = np.linalg.norm(line_vector)
101 if line_length == 0: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 continue
104 # Get point with max distance from line
105 for i in range(start + 1, end):
106 point_vector = curve[i] - curve[start]
107 distance = abs(np.cross(line_vector, point_vector)) / line_length
108 if distance > dmax:
109 dmax = distance
110 index = i
112 # Split if distance > epsilon (t_min)
113 if dmax > epsilon:
114 stack.append([start, index])
115 stack.append([index, end])
116 else:
117 indices[start + 1:end] = False
119 return curve[indices]
121def refine_pw_points_for_heating_or_cooling(curve: np.array, pw_points: np.array, eps_lb: float=0.0, hot_stream: bool=True) -> np.array:
122 """
123 Refines a piecewise linear approximation of a T-h profile to ensure feasibility as a hot or cold stream within eps.
124 The refinement is done by optimising the piecewise points to minimise the L2 norm of the difference between the
125 data and the piecewise points.
127 :param curve: Array of points (N, 2).
128 :param eps_lb: Maximum allowed hot or cold stream violation.
129 :param hot_stream: True if the stream is hot, False if the stream is cold.
130 :returns: Simplified array of points, maximum error.
131 """
132 # Ensure the data is a numpy array
133 curve = np.flipud(curve)
134 pw_points = np.flipud(pw_points)
136 # Remove the first and last points because they are fixed
137 # Convert 2d array to 1d array
138 x0 = pw_points[1:-1].flatten()
140 # Get arguments for the optimisation
141 args = {
142 'first_point' : pw_points[0],
143 'last_point' : pw_points[-1],
144 }
146 def delta_pw_and_data(x, args):
147 '''Returns the difference between the data and the piecewise points'''
148 # Reshape the piecewise points such that the first half are the x values and the second half are the y values
149 new_pw_points = np.vstack((args['first_point'], x.reshape(-1, 2), args['last_point']))
150 # Interpolate the piecewise points to the original data points
151 int_points = np.interp(curve[:, 0], new_pw_points[:, 0], new_pw_points[:, 1])
152 # Find the difference between the data and the piecewise points
153 return int_points - curve[:, 1]
155 # Define the constraints for the optimisation
156 if hot_stream: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 con = lambda x: np.max(delta_pw_and_data(x, args))
158 nlc = NonlinearConstraint(con, -np.inf, eps_lb)
159 else:
160 con = lambda x: np.min(delta_pw_and_data(x, args))
161 nlc = NonlinearConstraint(con, -eps_lb, np.inf)
163 def obj(x, args):
164 '''Returns the L2 norm of the difference between the data and the piecewise points'''
165 return np.sum(
166 np.square(
167 delta_pw_and_data(x, args)
168 )
169 )
171 # Perform the optimisation
172 res = minimize(fun=obj, x0=x0, constraints=nlc, args=args, method='SLSQP', tol=1e-6)
173 refined_pw_points = np.vstack((args['first_point'], res.x.reshape(-1, 2), args['last_point']))
174 return np.flipud(refined_pw_points), np.max(np.abs(delta_pw_and_data(res.x, args)))
176def get_piecewise_breakpoints(curve: np.array, epsilon: float, hot_stream: bool=True) -> np.array:
177 """
178 Get the piecewise breakpoints for a curve using the Ramer-Douglas-Peucker (RDP) algorithm followed
179 by an optimisation-based refinement to ensure hot and cold stream integrity for pinch analysis.
181 :param curve: Array of points (N, 2).
182 :param epsilon: Maximum allowed perpendicular distance for simplification; the deviation tolerance or delta_t_dev.
183 :param hot_stream: True if the stream is hot, False if the stream is cold.
184 :returns: Simplified array of breakpoints that define the piecewise linearisation.
185 """
186 for _ in range(10):
187 pw_points = rdp(curve, epsilon=epsilon)
189 if len(pw_points) > 2:
190 pw_points, max_err = refine_pw_points_for_heating_or_cooling(curve, pw_points, epsilon / 10, hot_stream)
191 else:
192 break
194 if max_err > epsilon: 194 ↛ 197line 194 didn't jump to line 197 because the condition on line 194 was always true
195 epsilon = epsilon * 0.9
196 else:
197 break
199 return pw_points
201def piecewise_curve(curve: list, epsilon: float, hot_stream:bool) -> np.array:
202 """
203 Performs piecewise linearisation on a curve using the Ramer-Douglas-Peucker (RDP) algorithm.
205 :param curve: Numpy array of plot points for th curve
206 :param epsilon: Maximum allowed temperature differential (t_min, tolerance etc)
207 :returns: Numpy array of new curve points
208 """
209 try:
210 curve = np.array(curve)
211 masked_values = get_piecewise_breakpoints(curve, epsilon=epsilon, hot_stream=hot_stream)
212 except:
213 masked_values = rdp(curve, epsilon=epsilon)
214 return masked_values
216def plot_t_h_curve(points, title: str = "Temperature vs. Enthalpy") -> None:
217 """
218 Plot Temperature vs. Enthalpy.
219 :param points: tuple with columns 'Temperature (K)' and 'Enthalpy (kJ/mol)'.
220 :param title: Title of the graph.
221 :returns: None
222 """
223 plt.figure(figsize=(10, 6))
224 plt.plot(points[:, 0], points[:, 1], marker="o", linestyle="-") ## not indexing tuppple correctly
225 plt.title(title, fontsize=16)
226 plt.ylabel("Temperature (K)", fontsize=14)
227 plt.xlabel("Enthalpy (kJ/mol)", fontsize=14)
228 plt.grid(True, linestyle="--", alpha=0.7)
229 plt.xticks(fontsize=12)
230 plt.yticks(fontsize=12)
231 plt.tight_layout()
233def plot_t_h_curve_with_piecewise_and_bounds(points: np.array, piecewise_points: np.array, epsilon: float, title: str = "Temperature vs. Enthalpy") -> None:
234 """
235 Plot the TH curve, its piecewise linearization, and a shaded region ±epsilon around the TH curve.
236 :param points: Original TH curve points.
237 :param piecewise_points: Simplified piecewise linear curve points.
238 :param epsilon: Epsilon value for shading.
239 :param title: Title of the graph.
240 """
241 enthalpies, temperatures = points[:, 0], points[:, 1]
242 upper_bound = [e + epsilon for e in temperatures]
243 lower_bound = [e - epsilon for e in temperatures]
245 plt.figure(figsize=(10, 6))
246 plt.plot(enthalpies, temperatures, label="TH Curve", color="red", linewidth=1.5)
247 plt.plot(piecewise_points[:, 0], piecewise_points[:, 1], label="Piecewise Curve", color="blue", linestyle="--", marker="o", linewidth=2) ## pwl curve should be dashed and blue
248 plt.fill_between(enthalpies, lower_bound, upper_bound, color="lightblue", alpha=0.3, label=f"±{epsilon} Bounds")
250 plt.title(title, fontsize=16)
251 plt.xlabel("Enthalpy (J/mol)", fontsize=14)
252 plt.ylabel("Temperature (K)", fontsize=14)
253 plt.legend(fontsize=12)
254 plt.grid(True, linestyle="--", alpha=0.7)
255 plt.tight_layout()
256 plt.show()
261from property_packages.build_package import build_package
262from pyomo.environ import value, ConcreteModel, RangeSet
263from idaes.core import FlowsheetBlock
264from idaes.core.solvers import get_solver
266def check_DOF(ls):
267 return 2 - sum([1 for x in ls if x is not None])
269def check_state_lengths(ls):
270 l = [len(x) if x is not None else 0 for x in ls ]
271 l.sort()
272 return l[-1] == l[-2]
274class StateEvaluation:
275 m = ConcreteModel()
276 m.fs = FlowsheetBlock(dynamic=False)
277 solver = get_solver("ipopt")
279 def __init__(self, ppKey: str, composition, mole_flow: float,
280 pressures: list = None, temperatures: list = None,
281 molar_enthalpies: list = None, molar_entropies: list = None) -> None:
282 # Build property package
283 self.m.fs.properties = build_package(ppKey, [comp for comp, _ in composition])
284 var = [pressures, temperatures, molar_enthalpies, molar_entropies]
285 if check_DOF(var) > 0: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 raise ValueError("Insufficient state variables provided.")
287 elif check_DOF(var) < 0: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 raise ValueError("Too many state variables provided.")
289 if check_state_lengths(var) == False: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 raise ValueError("Number of states in the different variables must match.")
292 n_states = max([len(x) if x is not None else 0 for x in var ])
293 self.m.fs.state_index = RangeSet(0, n_states - 1)
295 # Build multiple state blocks
296 self.m.fs.sb = self.m.fs.properties.build_state_block(
297 self.m.fs.state_index, defined_state=True
298 )
300 # Fix composition for each state block
301 for i in self.m.fs.state_index:
302 if len(composition) > 1: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 for comp, frac in composition:
304 self.m.fs.sb[i].mole_frac_comp[comp].fix(frac)
305 self.m.fs.sb[i].flow_mol.fix(mole_flow)
307 if pressures is not None: 307 ↛ 309line 307 didn't jump to line 309 because the condition on line 307 was always true
308 self.m.fs.sb[i].pressure.fix(pressures[i])
309 if temperatures is not None: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 self.m.fs.sb[i].temperature.fix(temperatures[i])
311 if ppKey == "milk": 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 if molar_enthalpies is not None:
313 self.m.fs.sb[i].constrain("enth_mol", molar_enthalpies[i])
314 if molar_entropies is not None:
315 self.m.fs.sb[i].constrain("entr_mol", molar_entropies[i])
316 else:
317 if molar_enthalpies is not None: 317 ↛ 319line 317 didn't jump to line 319 because the condition on line 317 was always true
318 self.m.fs.sb[i].enth_mol.fix(molar_enthalpies[i])
319 if molar_entropies is not None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 self.m.fs.sb[i].entr_mol.fix(molar_entropies[i])
322 # Initialize and solve all state blocks
323 self.m.fs.sb.initialize()
324 try:
325 self.solver.solve(self.m, tee=False)
326 except ValueError as e:
327 if str(e) == "No variables appear in the Pyomo model constraints or objective. This is not supported by the NL file interface": 327 ↛ 330line 327 didn't jump to line 330 because the condition on line 327 was always true
328 print("No need to solve for a solution. Pass")
329 else:
330 print(f"Solver failed with error: {e}")
331 raise
333 def get_property(self, prop, state_index: int = None):
334 try:
335 if state_index is None: 335 ↛ 338line 335 didn't jump to line 338 because the condition on line 335 was always true
336 return self.get_all_properties(prop)
337 else:
338 return self.get_single_property(prop, state_index)
339 except AttributeError:
340 print(f"Property '{prop}' not found in state block.")
341 return None
343 def get_single_property(self, prop: str, state_index: int = 0):
344 return [value(getattr(self.m.fs.sb[state_index], prop))]
346 def get_all_properties(self, prop: str):
347 return [value(getattr(self.m.fs.sb[i], prop)) for i in self.m.fs.state_index.ordered_data()]
349 def get_pressure(self, state_index: int = None):
350 return self.get_property('pressure', state_index)
352 def get_molar_enthalpy(self, state_index: int = None):
353 return self.get_property('enth_mol', state_index)
355 def get_mass_enthalpy(self, state_index: int = None):
356 return self.get_property('enth_mass', state_index)
358 def get_temperature(self, state_index: int = None):
359 return self.get_property('temperature', state_index)
361 def get_molar_entropy(self, state_index: int = None):
362 return self.get_property('entr_mol', state_index)
364 def get_mass_entropy(self, state_index: int = None):
365 return self.get_property('entr_mass', state_index)
367 def get_vapour_fraction(self, state_index: int = None):
368 return self.get_property('vap_frac', state_index)
370 def get_total_energy_flow(self, state_index: int = None):
371 return self.get_property('total_energy_flow', state_index)
373 def get_relative_humidity(self, state_index: int = None):
374 return self.get_property('relative_humidity', state_index)
376 def get_mass_flow(self, state_index: int = None):
377 return self.get_property('flow_mass', state_index)
379 def get_specific_volume(self, state_index: int = None):
380 return self.get_volumetric_flow(state_index) / self.get_mass_flow(state_index)
382 def get_density(self, state_index: int = None):
383 return self.get_mass_flow(state_index) / self.get_volumetric_flow(state_index)
385 def get_volumetric_flow(self, state_index: int = None):
386 return self.get_property('flow_vol', state_index)
388 def get_molecular_weight(self):
389 mw_total = 0.0
390 for comp in self.m.fs.sb[0].mole_frac_comp:
391 mw = value(self.m.fs.sb[0].params.mw_comp[comp])
392 x = value(self.m.fs.sb[0].mole_frac_comp[comp])
393 mw_total += mw * x
394 return mw_total
396 def get_component_molecular_weight(self, comp):
397 return value(self.m.fs.sb[0].params.mw_comp[comp])