Coverage for backend/ahuora-compounds/ahuora_property_packages/base/flexible_state_block.py: 19%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1from pyomo.environ import Constraint, check_optimal_termination 

2from watertap.core.solvers import get_solver 

3from idaes.core.util.initialization import fix_state_vars, solve_indexed_blocks 

4from idaes.core.util.model_statistics import degrees_of_freedom, number_unfixed_variables 

5from idaes.core.util.exceptions import InitializationError 

6import idaes.logger as idaeslog 

7from pyomo.core.base.expression import ScalarExpression, IndexedExpression, Expression, ExpressionData 

8from pyomo.environ import Var, ScalarVar 

9from pyomo.core.base.var import IndexedVar 

10import pyomo.environ as pyo 

11from idaes.core import ( 

12 StateBlock, 

13) 

14 

15class ExpressionConversionError(Exception): 

16 pass 

17 

18 

19class FlexibleStateBlockData(): 

20 

21 def build(blk, *args): 

22 blk.add_extra_expressions() 

23 # We initialise vars_to_deactivate here with __setattr__ instead of doing it  

24 # in the constructor of _SeawaterStateBlockConstraints as blk.vars_to_deactivate = [],  

25 # because on state blocks, missing attributes are not treated like normal Python objects. 

26 # Why the below works is because it bypasses the custom __setattr__ logic (which makes it a metadata)  

27 # on the block and writes directly to the object. 

28 # Also, initializing here guarantees every state block has its own list before constrain_component() runs. 

29 object.__setattr__(blk, "vars_to_deactivate", []) 

30 

31 def constrain(blk, name: str, value: float) -> Constraint | Var | None: 

32 """constrain a component by name to a value""" 

33 var = getattr(blk, name) 

34 return blk.constrain_component(var, value) 

35 

36 def constrain_component(blk, component: Var | Expression, value: float) -> Var | None: 

37 """ 

38 Constrain a component to a value 

39 """ 

40 try: 

41 variable = _convert_expression_to_var(component) 

42 except ExpressionConversionError: 

43 variable = component # already a Var, just fix it directly 

44 

45 variable.fix(value) 

46 

47 if isinstance(variable, IndexedVar): 

48 for i in variable.index_set(): 

49 # direct dictionary access avoids intercepted attribute resolution 

50 blk.__dict__["vars_to_deactivate"].append(variable[i]) 

51 else: 

52 blk.__dict__["vars_to_deactivate"].append(variable) 

53 

54 return variable 

55 

56 def add_extra_expressions(blk): 

57 """ 

58 IDAES state blocks don't support all the properties 

59 we need, so we add some extra expressions here. 

60  

61 This method can be overridden in a subclass to add 

62 additional expressions specific to the property package. 

63 """ 

64 if not hasattr(blk, "enth_mass"): 

65 blk.add_component("enth_mass", Expression(expr=(blk.flow_mol * blk.enth_mol) / blk.flow_mass)) 

66 if not hasattr(blk, "entr_mass"): 

67 blk.add_component("entr_mass", Expression(expr=(blk.flow_mol * blk.entr_mol) / blk.flow_mass)) 

68 if not hasattr(blk, "entr_mol"): 

69 blk.add_component("entr_mol", Expression(expr=(blk.flow_mol * blk.entr_mass) / blk.flow_mass)) 

70 if not hasattr(blk, "total_energy_flow"): 

71 blk.add_component("total_energy_flow", Expression(expr=blk.flow_mass * blk.enth_mass)) 

72 

73 

74def _convert_expression_to_var(expr: ScalarExpression | IndexedExpression): 

75 if isinstance(expr, ScalarExpression) or isinstance(expr, ExpressionData): 

76 var = Var(units=pyo.units.get_units(expr)) 

77 constraint = Constraint(expr= var == expr) 

78 elif isinstance(expr, IndexedExpression): 

79 var = Var(expr.index_set(), units=pyo.units.get_units(expr.units)) 

80 def rule(b, i): 

81 return var[i] == expr[i] 

82 constraint = Constraint(expr= rule) 

83 else: 

84 raise ExpressionConversionError(f"Expression {expr} is not a ScalarExpression or IndexedExpression: {type(expr)}") 

85 block = expr.parent_block() 

86 block.add_component(f"{expr.local_name}_var", var) 

87 block.add_component(f"{expr.local_name}_constraint", constraint) 

88 return var 

89 

90 

91def _solve_block(self, solve_log, init_log, opt, step_name): 

92 skip_solve = True # skip solve if only state variables are present 

93 for k in self.keys(): 

94 if number_unfixed_variables(self[k]) != 0: 

95 skip_solve = False 

96 

97 if not skip_solve: 

98 # Initialize properties 

99 with idaeslog.solver_log(solve_log, idaeslog.DEBUG) as slc: 

100 results = solve_indexed_blocks(opt, [self], tee=slc.tee) 

101 init_log.info_high( 

102 f"Property initialization {step_name}: {idaeslog.condition(results)}" 

103 ) 

104 

105 if (not skip_solve) and (not check_optimal_termination(results)): 

106 raise InitializationError( 

107 f"{self.name} {step_name} failed to initialize successfully. Please " 

108 f"check the output logs for more information." 

109 ) 

110 

111 

112class FlexibleStateBlock(StateBlock): 

113 """ 

114 This adds some additional methods for handling extra variables that have been fixed with constrain_component()  

115 during initialization. These wrapper methods can be called before and after the normal initialization routine  

116 to handle deactivating and reactivating constraints added with constrain_component(). 

117 """ 

118 

119 def _deactivate_additional_constraints(self): 

120 # Temporarily deactivate platform constraints added with 

121 # StateBlockConstraints.constrain()) so they don't interfere with 

122 # initialization. 

123 deactivated_vars: list[tuple[ScalarVar,float]] = [] 

124 

125 for k in self.keys(): 

126 blk = self[k] 

127 

128 for var in blk.__dict__["vars_to_deactivate"]: 

129 var.unfix() 

130 # Store the original value so we can reactivate and fix back to the original value later. 

131 deactivated_vars.append((var, var.value)) 

132 

133 self.deactivated_vars = deactivated_vars 

134 

135 def _reactivate_additional_constraints(self): 

136 for var, value in self.deactivated_vars: 

137 var.fix(value) 

138 

139 def pre_initialize(self, *args, **kwargs): 

140 """ 

141 Deactivate any additional constraints added with StateBlockConstraints.constrain() during initialization. 

142 Run the normal initialisation routine. 

143 Reactivate the constraints and solve again to ensure they are satisfied. 

144 If hold_state is True, restore the state to what it was 

145 """ 

146 self._deactivate_additional_constraints() 

147 

148 

149 def post_initialize(self,flags, 

150 state_args=None, 

151 state_vars_fixed=False, 

152 hold_state=False, 

153 outlvl=idaeslog.NOTSET, 

154 solver=None, 

155 optarg=None): 

156 

157 # Get loggers 

158 init_log = idaeslog.getInitLogger(self.name, outlvl, tag="properties") 

159 solve_log = idaeslog.getSolveLogger(self.name, outlvl, tag="properties") 

160 # Set solver and options 

161 opt = get_solver(solver, optarg) 

162 

163 if state_vars_fixed is False: 

164 if hold_state: 

165 # temporarily release state so we can solve with constraints reactivated, then switch back to original state after solve 

166 self.release_state(flags) 

167 

168 if degrees_of_freedom(self) == 0: 

169 # We want to solve again, with any constraints reactivated, to ensure that the state variables have the correct values. 

170 _solve_block(self, solve_log, init_log, opt, step_name="Second solve with constraints reactivated") 

171 

172 # If input block, return flags, else release state 

173 if state_vars_fixed is False: 

174 if hold_state is True: 

175 # Switch back to state vars fixed 

176 self._deactivate_additional_constraints() 

177 flags = fix_state_vars(self, state_args) 

178 return flags 

179 

180 

181 def post_release_state(self): 

182 # Reactivate platform constraints that were deferred during initialize 

183 self._reactivate_additional_constraints()