Coverage for backend/idaes_service/solver/methods/adapter.py: 88%
157 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import traceback
2from typing import Any
4from pyomo.environ import Block, value as pyo_value, Component, Var, ScalarVar, Reference, Expression
5from pyomo.core.base.expression import ScalarExpression
6from pyomo.core.base.constraint import ScalarConstraint
7from pyomo.core.base.units_container import units
8from idaes.core import FlowsheetBlock
9from idaes_service.solver.properties_manager import PropertiesManager
10from common.models.idaes.unit_model_schema import SolvedPropertyValueSchema
11from .units_handler import attach_unit, get_attached_unit, get_attached_unit_str, ValueWithUnits
12from common.models.idaes import PropertiesSchema, PropertySchema
13from typing import Dict
14from pyomo.core.base.indexed_component import UnindexedComponent_set, IndexedComponent
15from pyomo.core.base.indexed_component_slice import (
16 IndexedComponent_slice,
17)
18import numpy as np
19from common.models.idaes.id_types import PropertyValueId
21def get_component(blk: Block, key: str):
22 """
23 Get a component from a block, given a key. Doesn't handle indexes.
24 """
25 try:
26 # allow key to be split by "." to access nested properties eg. "hot_side.deltaP"
27 key_split = key.split(".")
28 b = blk
29 for k in key_split:
30 b = getattr(b, k)
31 return b
32 except AttributeError:
33 raise ValueError(
34 f"Property {key} not found in block `{blk}`. "
35 f"Available properties are {[x for x in blk.component_map().keys()]}"
36 )
38def add_to_property_map(vars: IndexedComponent, id: PropertyValueId, fs):
39 try:
40 name = next(vars.values()).name # maybe we should deprectate the name?
41 fs.properties_map.add(id, vars, name)
42 except StopIteration:
43 pass # The only reason this would happen is if there are no values in the indexed component, e.g in milk solids, there is no gas phase. In these case, we just skip adding to the property map.
44 #return c
46def add_corresponding_constraint(fs: FlowsheetBlock,c, id: PropertyValueId):
47 fs.properties_map.add_constraint(id, c)
50def soft_cast_float(value: Any) -> float:
51 """
52 Softly cast a value to float, returning None if the value is None or cannot be cast.
53 This is needed because not everything is indexed by time at all.
54 """
55 try:
56 return float(value)
57 except (ValueError, TypeError):
58 return None
60def items_by_time(s: Dict[str, Any]) -> list[tuple[str, Any]]:
61 """
62 Converts a dictionary of items indexed by time (as strings) to a list of tuples, and fixes the ordering.
63 This is because if "11" is ordered before "2" in a dictionary of strings, but we want the results ordered by time.
64 """
65 return sorted(s.items(), key=lambda x: soft_cast_float(x[0])) # sort by time index
69def get_index_set_shape(component: IndexedComponent) -> tuple[int,...]:
70 index_set = component.index_set()
71 if index_set.dimen == 0: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 return () # I don't think we ever actually have this case (we early return if there are no items in the index set with Warning: No variables found)
73 elif index_set.dimen == 1: 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true
74 return (len(index_set),)
75 elif index_set.dimen > 1:
76 shape = [len(s) for s in index_set.subsets()]
77 return tuple(shape)
80def serialize_properties_map(fs: FlowsheetBlock) -> list[SolvedPropertyValueSchema]:
81 properties : list[SolvedPropertyValueSchema] = []
82 # TODO: collate properties by timestep.
83 properties_map: PropertiesManager = fs.properties_map
84 for (uid, s) in properties_map.items():
85 if uid == -1:
86 # skip
87 continue
88 s.component
90 shape = get_index_set_shape(s.component)
92 values = [pyo_value(c) for c in s.component.values()]
94 if shape == (): 94 ↛ 96line 94 didn't jump to line 96 because the condition on line 94 was never true
95 # not indexed at all, just return the value (not as an array)
96 items = values[0]
97 else:
98 # Convert to an ndarray
99 items = np.array(values).reshape(shape).tolist()
101 property_dict = SolvedPropertyValueSchema(
102 id=uid,
103 name=s.name, # for debugging
104 value=items,
105 unit=get_attached_unit_str(s.component),
106 )
107 if s.unknown_units:
108 property_dict.unknown_units = s.unknown_units
110 properties.append(property_dict)
112 return properties
114def slice_is_indexed(blk: IndexedComponent | IndexedComponent_slice) -> bool:
115 """
116 Check if a block, variable, or expression is not indexed.
117 """
118 if isinstance(blk, IndexedComponent_slice):
119 # IndexedComponent_slice doesn't have is_indexed() method.
120 # Instead, it runs is_indexed() on each of the underlying components.
121 # We can assume that they are all the same, and just check the first one.
122 return blk.is_indexed()[0]
123 return blk.is_indexed()
125def slice_index_dimen(blk: IndexedComponent | IndexedComponent_slice) -> int:
126 """
127 Get the dimension of the index set of a block, variable, or expression.
128 """
129 if isinstance(blk, IndexedComponent_slice):
130 # IndexedComponent_slice doesn't have index_set() method.
131 # Instead, it runs index_set() on each of the underlying components.
132 # We can assume that they are all the same, and just check the first one.
133 return blk.index_set()[0].dimen
134 return blk.index_set().dimen
137def get_sliced_version(block: Block | Var | Expression | IndexedComponent_slice) -> Block | Var | Expression | IndexedComponent_slice:
138 """
139 Get a sliced version of a block, variable, or expression.
140 A very loose way of thinking of a sliced version is it "references all the indexes at the same time".
142 in a scalar block, you can do scalar_block.property.
143 in an indexed block, you can't do indexed_block.property, because you need to define the index you're looking at.
144 However, if you want to define the indexes later, such as in a reference, you can do indexed_block[:].property.
145 I.e you're creating a slice to all the indexes, and then accessing the property on that slice.
147 You can't really use a sliced version directly, but you can use it to create a Reference with reference(sliced_version).
148 It'll put all the indexes back together.
150 The main advantage is that you can use get_sliced_version on a indexed subattribute of a slice, and it will collate the indexes together.
151 """
152 if not slice_is_indexed(block):
153 return block
155 block_dimen = slice_index_dimen(block)
156 # we want to get a slice to all items in a block, like block[:,:]
157 # To do this programmatically:
158 # the ":" is represented by slice(None)
159 # so for a 2D block, we want (slice(None), slice(None))
160 # we can use a tuple comprehension to create this for any dimen
161 block_slice = block[tuple(slice(None) for _ in range(block_dimen))]
162 return block_slice
165def collate_indexes(block: Block, property_key: str) -> IndexedComponent:
166 """
167 Returns a reference to the property with all the indexes together.
168 This is because some properties have a different way of doing indexes:
169 e.g
170 - properties_out[t].temperature -> Reference[t]
171 - block.heat_duty[t] -> Reference[t]
172 - properties_out[t].mole_frac_comp[compound] -> Reference[t, compound]
173 - block.area -> Reference[None]
174 """
175 block_slice = get_sliced_version(block)
176 # Now we have the sliced version of the block, we can access the property using the property key.
177 # As we support nested properties, e.g property_key could be "hot_side.deltaP",
178 # we can use the get_component function rather than just getattr.
179 block_property = get_component(block_slice, property_key)
180 if block_property is None: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 raise ValueError(f"Property {property_key} not found in block {block}.")
182 # Now we have the property, we need to check if it has any indexes, and get a slice to all those indexes.
183 property_slice = get_sliced_version(block_property)
184 #property_slice = block_property
185 # Calling is_indexed() on a property_slice doesn't really make sense, it returns an array of bools. slices are always indexed though so
186 # that array gets cast to True, so its' kinda okay.
187 if property_slice.is_indexed(): # and not isinstance(property_slice, IndexedComponent_slice):
188 # if the property is indexed, we need to return a reference to the slice.
189 # This will collate all the indexes together.
190 # Note that we need to special case DerivativeVar, as Reference doesn't support DerivativeVar directly. ( pyomo dae will try differentiate it again.)
191 return Reference(property_slice, ctype=IndexedComponent) ## if isinstance(block_property,DerivativeVar) else NOTSET
192 else: # this is not indexed at all lol, so we can just return the property itself. Creating a reference will add a index[None] which is unnecessary.
193 return property_slice
197def fix_var(blk: Block, var : ScalarVar | ScalarExpression, value : ValueWithUnits) -> ScalarVar | ScalarConstraint:#
198 # returns: the expression that a constraint was added for, or the fixed var
199 if hasattr(blk, "constrain_component"):
200 component = blk.constrain_component(var, value)
201 elif isinstance(var, ScalarExpression): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 component = var
203 else:
204 #print(var)
205 var.fix(value)
206 component = var
207 return component
209def fix_slice(var_slice: IndexedComponent | IndexedComponent_slice, values: list[ValueWithUnits]) -> list[ScalarVar | ScalarConstraint]:
210 # Fix a slice of a variable to the given values.
211 # the var_slice should be a Reference or other indexed component, or a scalar variable
212 # values should be a list of values to fix the variable to.
213 results: list[ScalarVar | ScalarConstraint] = []
214 for var, value in zip(var_slice.values(), values):
215 blk = var.parent_block()
216 constraint = fix_var(blk, var, value)
217 results.append(constraint)
218 return results
221def deactivate_components(components: list[ScalarVar | ScalarConstraint ]):
222 # Deactivate a "PropertyValue" (which may have multiple subcomponents if it's indexed)
223 for c in components:
224 deactivate_component(c)
226def deactivate_component(c: ScalarVar | ScalarConstraint):
227 # deactivate "guess" variables: fixed for initialisation
228 # and unfixed for a control constraint
229 if isinstance(c, ScalarConstraint): 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 c.deactivate()
231 else:
232 c.unfix()
234def deactivate_fixed_guesses(guess_vars: list[list[ScalarVar | ScalarConstraint]]):
236 for c in guess_vars:
237 deactivate_components(c)
240def load_initial_guess(component: Component, value: float):
241 # load the initial guess into the component
242 if isinstance(component, Var): 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 component.value = value
245def load_initial_guesses(components: IndexedComponent, values: list[float]):
246 for c, v in zip(components.values(), values):
247 load_initial_guess(c, v)
250def fix_block(
251 block: Block,
252 properties_schema: PropertiesSchema,
253 fs: FlowsheetBlock,
254 block_ctx: "BlockContext",
255) -> None:
256 """
257 Fix the properties of a block based on the properties schema.
259 Args:
260 - block: The block to fix the properties of.
261 - properties_schema: The schema of the properties to fix.
262 - fs: Used to store the properties in the properties map.
263 """
264 property_key: str
265 property_info: PropertySchema
266 for property_key, property_info in properties_schema.items():
267 # Key is e.g "enth_mol"
269 # TODO: Handle transformers
270 # indexed_data = extract_indexes(
271 # property_info.data,
272 # property_info.unit,
273 # transformers,
274 # )
275 property_reference = collate_indexes(block, property_key)
277 for property_value in property_info.data:
278 discrete_indexes = property_value.discrete_indexes or []
280 pv_id = property_value.id
281 num_discrete_indexes = len(discrete_indexes)
282 num_property_indexes = property_reference.index_set().dimen if property_reference.is_indexed() else 0
283 num_continuous_indexes = num_property_indexes - num_discrete_indexes # This is the dimension of the property_value.value ndarray.
285 # We have the convention that all the continuous indexes come first in property_reference, and then the discrete indexes.
286 # This is what idaes normally does.
289 # We need to get a slice to the current set of discrete indexes.
290 if len(discrete_indexes) == 0:
291 property_slice = property_reference # no indexes to worry about.
292 else:
293 property_slice = property_reference[tuple(
294 list(slice(None) for _ in range(num_continuous_indexes)) + discrete_indexes
295 )]
296 # Now property_slice is only indexed by the continuous indexes.
298 # get a reference to the variable/expression. This will also add an index [None] if it is not indexed at all, i.e no continuous indexes.
299 variable_references = Reference(property_slice, ctype=IndexedComponent) #Ctype=IndexedComponent avoids problems with DerivativeVar
301 add_to_property_map(variable_references, pv_id, fs)
303 # Because both pyomo and numpy flatten arrays with the last index changing fastest, we can just flatten both the index set and the values, and then iterate through them together.
304 variable_indexes = list(variable_references.index_set())
306 if (len(variable_indexes) == 0):
307 print("Warning: No variables found for {property_key} with indexes {discrete_indexes}. This may be expected in the milk property package, which doesn't have a gas phase.")
308 continue
310 variable_transformed = variable_references
313 if property_value.value is not None:
314 variable_values = np.array([property_value.value]).flatten() # we put the value in an array to handle the case where it is a scalar.
315 variable_values_with_units = [attach_unit(v, property_info.unit) for v in variable_values]
316 variable_values_converted = [
317 units.convert(v, get_attached_unit(var)) for v, var in zip(variable_values_with_units, variable_references.values())
318 ]
320 #value = units.convert(property_value.value, get_attached_unit(var))
321 if property_value.constraint is not None:
322 # add the constraint to the list of constraints to be added
323 # at the flowsheet level. The var value should also
324 # be a guess to maintain the degrees of freedom.
325 expr = property_value.constraint
326 fs.constraint_exprs.append((variable_transformed, expr, pv_id))
327 if property_value.controlled:
328 # this is a set point. To maintain degrees of freedom,
329 # use the manipulated variable as a guess during initialisation.
330 # this is a weird case because we might have guesses for both the
331 # manipulated variable and this variable, and we want to end up
332 # using the formula/constraint. If we were to try use this guess
333 # by eliminating the manipulated variable guess, we would run into
334 # degrees of freedom issues at the flowsheet level if elimination
335 # failed (since it then adds another flowsheet level constraint).
336 load_initial_guesses(variable_references, variable_values_converted)
337 else:
338 # not a set point, use this guess during initialization.
339 component = fix_slice(variable_references, variable_values_converted)
340 fs.guess_vars.append(component)
341 elif property_value.controlled is not None:
342 # this value is a controlling variable, so don't fix it now
343 # it should be fixed after initialisation
344 block_ctx.add_controlled_var(variable_references, variable_values_converted, pv_id, property_value.controlled)
345 elif property_value.guess:
346 block_ctx.add_guess_var( variable_references, variable_values_converted, pv_id)
347 else:
348 components = fix_slice(variable_references, variable_values_converted)
349 # add_corresponding_constraint used to take the constraint returned by constrain_component if you are constraining an expression. TODO: Do we need to add this back in?
350 # from c = fix_var()
351 add_corresponding_constraint(fs, components, pv_id)
355from idaes_service.solver.methods.BlockContext import BlockContext