Coverage for backend/ahuora-compounds/ahuora_property_packages/base/flexible_state_block.py: 19%
95 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from pyomo.environ import Constraint, check_optimal_termination
2from watertap.core.solvers import get_solver
3from idaes.core.util.initialization import fix_state_vars, solve_indexed_blocks
4from idaes.core.util.model_statistics import degrees_of_freedom, number_unfixed_variables
5from idaes.core.util.exceptions import InitializationError
6import idaes.logger as idaeslog
7from pyomo.core.base.expression import ScalarExpression, IndexedExpression, Expression, ExpressionData
8from pyomo.environ import Var, ScalarVar
9from pyomo.core.base.var import IndexedVar
10import pyomo.environ as pyo
11from idaes.core import (
12 StateBlock,
13)
15class ExpressionConversionError(Exception):
16 pass
19class FlexibleStateBlockData():
21 def build(blk, *args):
22 blk.add_extra_expressions()
23 # We initialise vars_to_deactivate here with __setattr__ instead of doing it
24 # in the constructor of _SeawaterStateBlockConstraints as blk.vars_to_deactivate = [],
25 # because on state blocks, missing attributes are not treated like normal Python objects.
26 # Why the below works is because it bypasses the custom __setattr__ logic (which makes it a metadata)
27 # on the block and writes directly to the object.
28 # Also, initializing here guarantees every state block has its own list before constrain_component() runs.
29 object.__setattr__(blk, "vars_to_deactivate", [])
31 def constrain(blk, name: str, value: float) -> Constraint | Var | None:
32 """constrain a component by name to a value"""
33 var = getattr(blk, name)
34 return blk.constrain_component(var, value)
36 def constrain_component(blk, component: Var | Expression, value: float) -> Var | None:
37 """
38 Constrain a component to a value
39 """
40 try:
41 variable = _convert_expression_to_var(component)
42 except ExpressionConversionError:
43 variable = component # already a Var, just fix it directly
45 variable.fix(value)
47 if isinstance(variable, IndexedVar):
48 for i in variable.index_set():
49 # direct dictionary access avoids intercepted attribute resolution
50 blk.__dict__["vars_to_deactivate"].append(variable[i])
51 else:
52 blk.__dict__["vars_to_deactivate"].append(variable)
54 return variable
56 def add_extra_expressions(blk):
57 """
58 IDAES state blocks don't support all the properties
59 we need, so we add some extra expressions here.
61 This method can be overridden in a subclass to add
62 additional expressions specific to the property package.
63 """
64 if not hasattr(blk, "enth_mass"):
65 blk.add_component("enth_mass", Expression(expr=(blk.flow_mol * blk.enth_mol) / blk.flow_mass))
66 if not hasattr(blk, "entr_mass"):
67 blk.add_component("entr_mass", Expression(expr=(blk.flow_mol * blk.entr_mol) / blk.flow_mass))
68 if not hasattr(blk, "entr_mol"):
69 blk.add_component("entr_mol", Expression(expr=(blk.flow_mol * blk.entr_mass) / blk.flow_mass))
70 if not hasattr(blk, "total_energy_flow"):
71 blk.add_component("total_energy_flow", Expression(expr=blk.flow_mass * blk.enth_mass))
74def _convert_expression_to_var(expr: ScalarExpression | IndexedExpression):
75 if isinstance(expr, ScalarExpression) or isinstance(expr, ExpressionData):
76 var = Var(units=pyo.units.get_units(expr))
77 constraint = Constraint(expr= var == expr)
78 elif isinstance(expr, IndexedExpression):
79 var = Var(expr.index_set(), units=pyo.units.get_units(expr.units))
80 def rule(b, i):
81 return var[i] == expr[i]
82 constraint = Constraint(expr= rule)
83 else:
84 raise ExpressionConversionError(f"Expression {expr} is not a ScalarExpression or IndexedExpression: {type(expr)}")
85 block = expr.parent_block()
86 block.add_component(f"{expr.local_name}_var", var)
87 block.add_component(f"{expr.local_name}_constraint", constraint)
88 return var
91def _solve_block(self, solve_log, init_log, opt, step_name):
92 skip_solve = True # skip solve if only state variables are present
93 for k in self.keys():
94 if number_unfixed_variables(self[k]) != 0:
95 skip_solve = False
97 if not skip_solve:
98 # Initialize properties
99 with idaeslog.solver_log(solve_log, idaeslog.DEBUG) as slc:
100 results = solve_indexed_blocks(opt, [self], tee=slc.tee)
101 init_log.info_high(
102 f"Property initialization {step_name}: {idaeslog.condition(results)}"
103 )
105 if (not skip_solve) and (not check_optimal_termination(results)):
106 raise InitializationError(
107 f"{self.name} {step_name} failed to initialize successfully. Please "
108 f"check the output logs for more information."
109 )
112class FlexibleStateBlock(StateBlock):
113 """
114 This adds some additional methods for handling extra variables that have been fixed with constrain_component()
115 during initialization. These wrapper methods can be called before and after the normal initialization routine
116 to handle deactivating and reactivating constraints added with constrain_component().
117 """
119 def _deactivate_additional_constraints(self):
120 # Temporarily deactivate platform constraints added with
121 # StateBlockConstraints.constrain()) so they don't interfere with
122 # initialization.
123 deactivated_vars: list[tuple[ScalarVar,float]] = []
125 for k in self.keys():
126 blk = self[k]
128 for var in blk.__dict__["vars_to_deactivate"]:
129 var.unfix()
130 # Store the original value so we can reactivate and fix back to the original value later.
131 deactivated_vars.append((var, var.value))
133 self.deactivated_vars = deactivated_vars
135 def _reactivate_additional_constraints(self):
136 for var, value in self.deactivated_vars:
137 var.fix(value)
139 def pre_initialize(self, *args, **kwargs):
140 """
141 Deactivate any additional constraints added with StateBlockConstraints.constrain() during initialization.
142 Run the normal initialisation routine.
143 Reactivate the constraints and solve again to ensure they are satisfied.
144 If hold_state is True, restore the state to what it was
145 """
146 self._deactivate_additional_constraints()
149 def post_initialize(self,flags,
150 state_args=None,
151 state_vars_fixed=False,
152 hold_state=False,
153 outlvl=idaeslog.NOTSET,
154 solver=None,
155 optarg=None):
157 # Get loggers
158 init_log = idaeslog.getInitLogger(self.name, outlvl, tag="properties")
159 solve_log = idaeslog.getSolveLogger(self.name, outlvl, tag="properties")
160 # Set solver and options
161 opt = get_solver(solver, optarg)
163 if state_vars_fixed is False:
164 if hold_state:
165 # temporarily release state so we can solve with constraints reactivated, then switch back to original state after solve
166 self.release_state(flags)
168 if degrees_of_freedom(self) == 0:
169 # We want to solve again, with any constraints reactivated, to ensure that the state variables have the correct values.
170 _solve_block(self, solve_log, init_log, opt, step_name="Second solve with constraints reactivated")
172 # If input block, return flags, else release state
173 if state_vars_fixed is False:
174 if hold_state is True:
175 # Switch back to state vars fixed
176 self._deactivate_additional_constraints()
177 flags = fix_state_vars(self, state_args)
178 return flags
181 def post_release_state(self):
182 # Reactivate platform constraints that were deferred during initialize
183 self._reactivate_additional_constraints()