Coverage for backend/ahuora-builder/src/ahuora_builder/methods/adapter.py: 90%
159 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
1import traceback
2from typing import Any
4from pyomo.environ import Block, value as pyo_value, Component, Var, ScalarVar, Reference, Expression
5from pyomo.core.base.expression import ScalarExpression
6from pyomo.core.base.constraint import ScalarConstraint
7from pyomo.core.base.units_container import units
8from idaes.core import FlowsheetBlock
9from ahuora_builder.properties_manager import PropertiesManager
10from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema
11from .units_handler import attach_unit, get_attached_unit, get_attached_unit_str, ValueWithUnits
12from ahuora_builder_types import PropertiesSchema, PropertySchema
13from typing import Dict
14from pyomo.core.base.indexed_component import UnindexedComponent_set, IndexedComponent
15from pyomo.core.base.indexed_component_slice import (
16 IndexedComponent_slice,
17)
18import numpy as np
19from ahuora_builder_types.id_types import PropertyValueId
21def get_component(blk: Block, key: str):
22 """
23 Get a component from a block, given a key. Doesn't handle indexes.
24 """
25 try:
26 # allow key to be split by "." to access nested properties eg. "hot_side.deltaP"
27 key_split = key.split(".")
28 b = blk
29 for k in key_split:
30 b = getattr(b, k)
31 return b
32 except AttributeError:
33 raise ValueError(
34 f"Property {key} not found in block `{blk}`. "
35 f"Available properties are {[x for x in blk.component_map().keys()]}"
36 )
38def add_to_property_map(vars: IndexedComponent, id: PropertyValueId, fs):
39 try:
40 name = next(vars.values()).name # maybe we should deprectate the name?
41 fs.properties_map.add(id, vars, name)
42 except StopIteration:
43 pass # The only reason this would happen is if there are no values in the indexed component, e.g in milk solids, there is no gas phase. In these case, we just skip adding to the property map.
44 #return c
46def add_corresponding_constraint(fs: FlowsheetBlock,c, id: PropertyValueId):
47 fs.properties_map.add_constraint(id, c)
50def soft_cast_float(value: Any) -> float:
51 """
52 Softly cast a value to float, returning None if the value is None or cannot be cast.
53 This is needed because not everything is indexed by time at all.
54 """
55 try:
56 return float(value)
57 except (ValueError, TypeError):
58 return None
60def items_by_time(s: Dict[str, Any]) -> list[tuple[str, Any]]:
61 """
62 Converts a dictionary of items indexed by time (as strings) to a list of tuples, and fixes the ordering.
63 This is because if "11" is ordered before "2" in a dictionary of strings, but we want the results ordered by time.
64 """
65 return sorted(s.items(), key=lambda x: soft_cast_float(x[0])) # sort by time index
69def get_index_set_shape(component: IndexedComponent) -> tuple[int,...]:
70 index_set = component.index_set()
71 if index_set.dimen == 0: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 return () # I don't think we ever actually have this case (we early return if there are no items in the index set with Warning: No variables found)
73 elif index_set.dimen == 1: 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true
74 return (len(index_set),)
75 elif index_set.dimen > 1:
76 shape = [len(s) for s in index_set.subsets()]
77 return tuple(shape)
80def serialize_properties_map(fs: FlowsheetBlock) -> list[SolvedPropertyValueSchema]:
81 properties : list[SolvedPropertyValueSchema] = []
82 # TODO: collate properties by timestep.
83 properties_map: PropertiesManager = fs.properties_map
84 for (uid, s) in properties_map.items():
85 if uid == -1:
86 # skip
87 continue
88 s.component
90 shape = get_index_set_shape(s.component)
92 values = [pyo_value(c) for c in s.component.values()]
94 if shape == (): 94 ↛ 96line 94 didn't jump to line 96 because the condition on line 94 was never true
95 # not indexed at all, just return the value (not as an array)
96 items = values[0]
97 else:
98 # Convert to an ndarray
99 items = np.array(values).tolist()
101 property_dict = SolvedPropertyValueSchema(
102 id=uid,
103 name=s.name, # for debugging
104 value=items,
105 unit=get_attached_unit_str(s.component),
106 )
107 if s.unknown_units:
108 property_dict.unknown_units = s.unknown_units
110 properties.append(property_dict)
112 return properties
114def slice_is_indexed(blk: IndexedComponent | IndexedComponent_slice) -> bool:
115 """
116 Check if a block, variable, or expression is not indexed.
117 """
118 if isinstance(blk, IndexedComponent_slice):
119 # IndexedComponent_slice doesn't have is_indexed() method.
120 # Instead, it runs is_indexed() on each of the underlying components.
121 # We can assume that they are all the same, and just check the first one.
122 return blk.is_indexed()[0]
123 return blk.is_indexed()
125def slice_index_dimen(blk: IndexedComponent | IndexedComponent_slice) -> int:
126 """
127 Get the dimension of the index set of a block, variable, or expression.
128 """
129 if isinstance(blk, IndexedComponent_slice):
130 # IndexedComponent_slice doesn't have index_set() method.
131 # Instead, it runs index_set() on each of the underlying components.
132 # We can assume that they are all the same, and just check the first one.
133 return blk.index_set()[0].dimen
134 return blk.index_set().dimen
137def get_sliced_version(block: Block | Var | Expression | IndexedComponent_slice) -> Block | Var | Expression | IndexedComponent_slice:
138 """
139 Get a sliced version of a block, variable, or expression.
140 A very loose way of thinking of a sliced version is it "references all the indexes at the same time".
142 in a scalar block, you can do scalar_block.property.
143 in an indexed block, you can't do indexed_block.property, because you need to define the index you're looking at.
144 However, if you want to define the indexes later, such as in a reference, you can do indexed_block[:].property.
145 I.e you're creating a slice to all the indexes, and then accessing the property on that slice.
147 You can't really use a sliced version directly, but you can use it to create a Reference with reference(sliced_version).
148 It'll put all the indexes back together.
150 The main advantage is that you can use get_sliced_version on a indexed subattribute of a slice, and it will collate the indexes together.
151 """
152 if not slice_is_indexed(block):
153 return block
155 block_dimen = slice_index_dimen(block)
156 # we want to get a slice to all items in a block, like block[:,:]
157 # To do this programmatically:
158 # the ":" is represented by slice(None)
159 # so for a 2D block, we want (slice(None), slice(None))
160 # we can use a tuple comprehension to create this for any dimen
161 block_slice = block[tuple(slice(None) for _ in range(block_dimen))]
162 return block_slice
165def collate_indexes(block: Block, property_key: str) -> IndexedComponent:
166 """
167 Returns a reference to the property with all the indexes together.
168 This is because some properties have a different way of doing indexes:
169 e.g
170 - properties_out[t].temperature -> Reference[t]
171 - block.heat_duty[t] -> Reference[t]
172 - properties_out[t].mole_frac_comp[compound] -> Reference[t, compound]
173 - block.area -> Reference[None]
174 """
175 block_slice = get_sliced_version(block)
176 # Now we have the sliced version of the block, we can access the property using the property key.
177 # As we support nested properties, e.g property_key could be "hot_side.deltaP",
178 # we can use the get_component function rather than just getattr.
179 block_property = get_component(block_slice, property_key)
180 if block_property is None: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 raise ValueError(f"Property {property_key} not found in block {block}.")
182 # Now we have the property, we need to check if it has any indexes, and get a slice to all those indexes.
183 property_slice = get_sliced_version(block_property)
184 #property_slice = block_property
185 # Calling is_indexed() on a property_slice doesn't really make sense, it returns an array of bools. slices are always indexed though so
186 # that array gets cast to True, so its' kinda okay.
187 if property_slice.is_indexed(): # and not isinstance(property_slice, IndexedComponent_slice):
188 # if the property is indexed, we need to return a reference to the slice.
189 # This will collate all the indexes together.
190 # Note that we need to special case DerivativeVar, as Reference doesn't support DerivativeVar directly. ( pyomo dae will try differentiate it again.)
191 return Reference(property_slice, ctype=IndexedComponent) ## if isinstance(block_property,DerivativeVar) else NOTSET
192 else: # this is not indexed at all lol, so we can just return the property itself. Creating a reference will add a index[None] which is unnecessary.
193 return property_slice
197def fix_var(blk: Block, var : ScalarVar | ScalarExpression, value : ValueWithUnits) -> ScalarVar | ScalarConstraint:#
198 # returns: the expression that a constraint was added for, or the fixed var
199 if hasattr(blk, "constrain_component"):
200 constraint = blk.constrain_component(var, value)
201 return constraint
202 elif isinstance(var, ScalarExpression):
203 # This is only used in tests right now I think. Our only expressions are currently in the property packages, and they use the constrain_component method.
204 constraint = ScalarConstraint(expr=var == value)
205 blk.add_component(f"{var.name}_constraint", constraint)
206 return constraint
207 else:
208 #print(var)
209 var.fix(value)
210 return var
212def fix_slice(var_slice: IndexedComponent | IndexedComponent_slice, values: list[ValueWithUnits]) -> list[ScalarVar | ScalarConstraint]:
213 # Fix a slice of a variable to the given values.
214 # the var_slice should be a Reference or other indexed component, or a scalar variable
215 # values should be a list of values to fix the variable to.
216 results: list[ScalarVar | ScalarConstraint] = []
217 for var, value in zip(var_slice.values(), values):
218 blk = var.parent_block()
219 constraint = fix_var(blk, var, value)
220 results.append(constraint)
221 return results
224def deactivate_components(components: list[ScalarVar | ScalarConstraint ]):
225 # Deactivate a "PropertyValue" (which may have multiple subcomponents if it's indexed)
226 for c in components:
227 deactivate_component(c)
229def deactivate_component(c: ScalarVar | ScalarConstraint):
230 # deactivate "guess" variables: fixed for initialisation
231 # and unfixed for a control constraint
232 if isinstance(c, ScalarConstraint):
233 c.deactivate()
234 else:
235 c.unfix()
237def deactivate_fixed_guesses(guess_vars: list[list[ScalarVar | ScalarConstraint]]):
239 for c in guess_vars:
240 deactivate_components(c)
243def load_initial_guess(component: Component, value: float):
244 # load the initial guess into the component
245 if isinstance(component, Var): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 component.value = value
248def load_initial_guesses(components: IndexedComponent, values: list[float]):
249 for c, v in zip(components.values(), values):
250 load_initial_guess(c, v)
253def fix_block(
254 block: Block,
255 properties_schema: PropertiesSchema,
256 fs: FlowsheetBlock,
257 block_ctx: "BlockContext",
258) -> None:
259 """
260 Fix the properties of a block based on the properties schema.
262 Args:
263 - block: The block to fix the properties of.
264 - properties_schema: The schema of the properties to fix.
265 - fs: Used to store the properties in the properties map.
266 """
267 property_key: str
268 property_info: PropertySchema
269 for property_key, property_info in properties_schema.items():
270 # Key is e.g "enth_mol"
272 # TODO: Handle transformers
273 # indexed_data = extract_indexes(
274 # property_info.data,
275 # property_info.unit,
276 # transformers,
277 # )
278 property_reference = collate_indexes(block, property_key)
280 for property_value in property_info.data:
281 discrete_indexes = property_value.discrete_indexes or []
283 pv_id = property_value.id
284 num_discrete_indexes = len(discrete_indexes)
285 num_property_indexes = property_reference.index_set().dimen if property_reference.is_indexed() else 0
286 num_continuous_indexes = num_property_indexes - num_discrete_indexes # This is the dimension of the property_value.value ndarray.
288 # We have the convention that all the continuous indexes come first in property_reference, and then the discrete indexes.
289 # This is what idaes normally does.
292 # We need to get a slice to the current set of discrete indexes.
293 if len(discrete_indexes) == 0:
294 property_slice = property_reference # no indexes to worry about.
295 else:
296 property_slice = property_reference[tuple(
297 list(slice(None) for _ in range(num_continuous_indexes)) + discrete_indexes
298 )]
299 # Now property_slice is only indexed by the continuous indexes.
301 # get a reference to the variable/expression. This will also add an index [None] if it is not indexed at all, i.e no continuous indexes.
302 variable_references = Reference(property_slice, ctype=IndexedComponent) #Ctype=IndexedComponent avoids problems with DerivativeVar
304 add_to_property_map(variable_references, pv_id, fs)
306 # Because both pyomo and numpy flatten arrays with the last index changing fastest, we can just flatten both the index set and the values, and then iterate through them together.
307 variable_indexes = list(variable_references.index_set())
309 if (len(variable_indexes) == 0):
310 print(f"Warning: No variables found for {property_key} with indexes {discrete_indexes}. This may be expected in the milk property package, which doesn't have a gas phase.")
311 continue
313 variable_transformed = variable_references
316 if property_value.value is not None:
317 variable_values = np.array([property_value.value]).flatten() # we put the value in an array to handle the case where it is a scalar.
318 variable_values_with_units = [attach_unit(v, property_info.unit) for v in variable_values]
319 variable_values_converted = [
320 units.convert(v, get_attached_unit(var)) for v, var in zip(variable_values_with_units, variable_references.values())
321 ]
323 #value = units.convert(property_value.value, get_attached_unit(var))
324 if property_value.constraint is not None:
325 # add the constraint to the list of constraints to be added
326 # at the flowsheet level. The var value should also
327 # be a guess to maintain the degrees of freedom.
328 expr = property_value.constraint
329 fs.constraint_exprs.append((variable_transformed, expr, pv_id))
330 if property_value.controlled:
331 # this is a set point. To maintain degrees of freedom,
332 # use the manipulated variable as a guess during initialisation.
333 # this is a weird case because we might have guesses for both the
334 # manipulated variable and this variable, and we want to end up
335 # using the formula/constraint. If we were to try use this guess
336 # by eliminating the manipulated variable guess, we would run into
337 # degrees of freedom issues at the flowsheet level if elimination
338 # failed (since it then adds another flowsheet level constraint).
339 load_initial_guesses(variable_references, variable_values_converted)
340 else:
341 # not a set point, use this guess during initialization.
342 component = fix_slice(variable_references, variable_values_converted)
343 fs.guess_vars.append(component)
344 elif property_value.controlled is not None:
345 # this value is a controlling variable, so don't fix it now
346 # it should be fixed after initialisation
347 block_ctx.add_controlled_var(variable_references, variable_values_converted, pv_id, property_value.controlled)
348 elif property_value.guess:
349 block_ctx.add_guess_var( variable_references, variable_values_converted, pv_id)
350 else:
351 components = fix_slice(variable_references, variable_values_converted)
352 # add_corresponding_constraint used to take the constraint returned by constrain_component if you are constraining an expression. TODO: Do we need to add this back in?
353 # from c = fix_var()
354 add_corresponding_constraint(fs, components, pv_id)
358from ahuora_builder.methods.BlockContext import BlockContext