Coverage for backend/ahuora-builder/src/ahuora_builder/methods/adapter.py: 90%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-26 20:57 +0000

1import traceback 

2from typing import Any 

3 

4from pyomo.environ import Block, value as pyo_value, Component, Var, ScalarVar, Reference, Expression 

5from pyomo.core.base.expression import ScalarExpression 

6from pyomo.core.base.constraint import ScalarConstraint 

7from pyomo.core.base.units_container import units 

8from idaes.core import FlowsheetBlock 

9from ahuora_builder.properties_manager import PropertiesManager 

10from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema 

11from .units_handler import attach_unit, get_attached_unit, get_attached_unit_str, ValueWithUnits 

12from ahuora_builder_types import PropertiesSchema, PropertySchema 

13from typing import Dict 

14from pyomo.core.base.indexed_component import UnindexedComponent_set, IndexedComponent 

15from pyomo.core.base.indexed_component_slice import ( 

16 IndexedComponent_slice, 

17) 

18import numpy as np 

19from ahuora_builder_types.id_types import PropertyValueId 

20 

21def get_component(blk: Block, key: str): 

22 """ 

23 Get a component from a block, given a key. Doesn't handle indexes. 

24 """ 

25 try: 

26 # allow key to be split by "." to access nested properties eg. "hot_side.deltaP" 

27 key_split = key.split(".") 

28 b = blk 

29 for k in key_split: 

30 b = getattr(b, k) 

31 return b 

32 except AttributeError: 

33 raise ValueError( 

34 f"Property {key} not found in block `{blk}`. " 

35 f"Available properties are {[x for x in blk.component_map().keys()]}" 

36 ) 

37 

38def add_to_property_map(vars: IndexedComponent, id: PropertyValueId, fs): 

39 try: 

40 name = next(vars.values()).name # maybe we should deprectate the name? 

41 fs.properties_map.add(id, vars, name) 

42 except StopIteration: 

43 pass # The only reason this would happen is if there are no values in the indexed component, e.g in milk solids, there is no gas phase. In these case, we just skip adding to the property map.  

44 #return c 

45 

46def add_corresponding_constraint(fs: FlowsheetBlock,c, id: PropertyValueId): 

47 fs.properties_map.add_constraint(id, c) 

48 

49 

50def soft_cast_float(value: Any) -> float: 

51 """ 

52 Softly cast a value to float, returning None if the value is None or cannot be cast. 

53 This is needed because not everything is indexed by time at all. 

54 """ 

55 try: 

56 return float(value) 

57 except (ValueError, TypeError): 

58 return None 

59 

60def items_by_time(s: Dict[str, Any]) -> list[tuple[str, Any]]: 

61 """ 

62 Converts a dictionary of items indexed by time (as strings) to a list of tuples, and fixes the ordering. 

63 This is because if "11" is ordered before "2" in a dictionary of strings, but we want the results ordered by time. 

64 """ 

65 return sorted(s.items(), key=lambda x: soft_cast_float(x[0])) # sort by time index 

66 

67 

68 

69def get_index_set_shape(component: IndexedComponent) -> tuple[int,...]: 

70 index_set = component.index_set() 

71 if index_set.dimen == 0: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 return () # I don't think we ever actually have this case (we early return if there are no items in the index set with Warning: No variables found) 

73 elif index_set.dimen == 1: 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true

74 return (len(index_set),) 

75 elif index_set.dimen > 1: 

76 shape = [len(s) for s in index_set.subsets()] 

77 return tuple(shape) 

78 

79 

80def serialize_properties_map(fs: FlowsheetBlock) -> list[SolvedPropertyValueSchema]: 

81 properties : list[SolvedPropertyValueSchema] = [] 

82 # TODO: collate properties by timestep. 

83 properties_map: PropertiesManager = fs.properties_map 

84 for (uid, s) in properties_map.items(): 

85 if uid == -1: 

86 # skip 

87 continue 

88 s.component 

89 

90 shape = get_index_set_shape(s.component) 

91 

92 values = [pyo_value(c) for c in s.component.values()] 

93 

94 if shape == (): 94 ↛ 96line 94 didn't jump to line 96 because the condition on line 94 was never true

95 # not indexed at all, just return the value (not as an array) 

96 items = values[0] 

97 else: 

98 # Convert to an ndarray 

99 items = np.array(values).tolist() 

100 

101 property_dict = SolvedPropertyValueSchema( 

102 id=uid, 

103 name=s.name, # for debugging 

104 value=items, 

105 unit=get_attached_unit_str(s.component), 

106 ) 

107 if s.unknown_units: 

108 property_dict.unknown_units = s.unknown_units 

109 

110 properties.append(property_dict) 

111 

112 return properties 

113 

114def slice_is_indexed(blk: IndexedComponent | IndexedComponent_slice) -> bool: 

115 """ 

116 Check if a block, variable, or expression is not indexed. 

117 """ 

118 if isinstance(blk, IndexedComponent_slice): 

119 # IndexedComponent_slice doesn't have is_indexed() method. 

120 # Instead, it runs is_indexed() on each of the underlying components. 

121 # We can assume that they are all the same, and just check the first one. 

122 return blk.is_indexed()[0] 

123 return blk.is_indexed() 

124 

125def slice_index_dimen(blk: IndexedComponent | IndexedComponent_slice) -> int: 

126 """ 

127 Get the dimension of the index set of a block, variable, or expression. 

128 """ 

129 if isinstance(blk, IndexedComponent_slice): 

130 # IndexedComponent_slice doesn't have index_set() method. 

131 # Instead, it runs index_set() on each of the underlying components. 

132 # We can assume that they are all the same, and just check the first one. 

133 return blk.index_set()[0].dimen 

134 return blk.index_set().dimen 

135 

136 

137def get_sliced_version(block: Block | Var | Expression | IndexedComponent_slice) -> Block | Var | Expression | IndexedComponent_slice: 

138 """ 

139 Get a sliced version of a block, variable, or expression. 

140 A very loose way of thinking of a sliced version is it "references all the indexes at the same time". 

141  

142 in a scalar block, you can do scalar_block.property. 

143 in an indexed block, you can't do indexed_block.property, because you need to define the index you're looking at. 

144 However, if you want to define the indexes later, such as in a reference, you can do indexed_block[:].property. 

145 I.e you're creating a slice to all the indexes, and then accessing the property on that slice. 

146 

147 You can't really use a sliced version directly, but you can use it to create a Reference with reference(sliced_version). 

148 It'll put all the indexes back together. 

149 

150 The main advantage is that you can use get_sliced_version on a indexed subattribute of a slice, and it will collate the indexes together. 

151 """ 

152 if not slice_is_indexed(block): 

153 return block 

154 

155 block_dimen = slice_index_dimen(block) 

156 # we want to get a slice to all items in a block, like block[:,:] 

157 # To do this programmatically: 

158 # the ":" is represented by slice(None) 

159 # so for a 2D block, we want (slice(None), slice(None)) 

160 # we can use a tuple comprehension to create this for any dimen 

161 block_slice = block[tuple(slice(None) for _ in range(block_dimen))] 

162 return block_slice 

163 

164 

165def collate_indexes(block: Block, property_key: str) -> IndexedComponent: 

166 """ 

167 Returns a reference to the property with all the indexes together. 

168 This is because some properties have a different way of doing indexes: 

169 e.g  

170 - properties_out[t].temperature -> Reference[t] 

171 - block.heat_duty[t] -> Reference[t] 

172 - properties_out[t].mole_frac_comp[compound] -> Reference[t, compound] 

173 - block.area -> Reference[None] 

174 """ 

175 block_slice = get_sliced_version(block) 

176 # Now we have the sliced version of the block, we can access the property using the property key. 

177 # As we support nested properties, e.g property_key could be "hot_side.deltaP", 

178 # we can use the get_component function rather than just getattr. 

179 block_property = get_component(block_slice, property_key) 

180 if block_property is None: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 raise ValueError(f"Property {property_key} not found in block {block}.") 

182 # Now we have the property, we need to check if it has any indexes, and get a slice to all those indexes. 

183 property_slice = get_sliced_version(block_property) 

184 #property_slice = block_property 

185 # Calling is_indexed() on a property_slice doesn't really make sense, it returns an array of bools. slices are always indexed though so 

186 # that array gets cast to True, so its' kinda okay. 

187 if property_slice.is_indexed(): # and not isinstance(property_slice, IndexedComponent_slice): 

188 # if the property is indexed, we need to return a reference to the slice. 

189 # This will collate all the indexes together. 

190 # Note that we need to special case DerivativeVar, as Reference doesn't support DerivativeVar directly. ( pyomo dae will try differentiate it again.) 

191 return Reference(property_slice, ctype=IndexedComponent) ## if isinstance(block_property,DerivativeVar) else NOTSET 

192 else: # this is not indexed at all lol, so we can just return the property itself. Creating a reference will add a index[None] which is unnecessary. 

193 return property_slice 

194 

195 

196 

197def fix_var(blk: Block, var : ScalarVar | ScalarExpression, value : ValueWithUnits) -> ScalarVar | ScalarConstraint:# 

198 # returns: the expression that a constraint was added for, or the fixed var 

199 if hasattr(blk, "constrain_component"): 

200 constraint = blk.constrain_component(var, value) 

201 return constraint 

202 elif isinstance(var, ScalarExpression): 

203 # This is only used in tests right now I think. Our only expressions are currently in the property packages, and they use the constrain_component method. 

204 constraint = ScalarConstraint(expr=var == value) 

205 blk.add_component(f"{var.name}_constraint", constraint) 

206 return constraint 

207 else: 

208 #print(var) 

209 var.fix(value) 

210 return var 

211 

212def fix_slice(var_slice: IndexedComponent | IndexedComponent_slice, values: list[ValueWithUnits]) -> list[ScalarVar | ScalarConstraint]: 

213 # Fix a slice of a variable to the given values. 

214 # the var_slice should be a Reference or other indexed component, or a scalar variable 

215 # values should be a list of values to fix the variable to. 

216 results: list[ScalarVar | ScalarConstraint] = [] 

217 for var, value in zip(var_slice.values(), values): 

218 blk = var.parent_block() 

219 constraint = fix_var(blk, var, value) 

220 results.append(constraint) 

221 return results 

222 

223 

224def deactivate_components(components: list[ScalarVar | ScalarConstraint ]): 

225 # Deactivate a "PropertyValue" (which may have multiple subcomponents if it's indexed) 

226 for c in components: 

227 deactivate_component(c) 

228 

229def deactivate_component(c: ScalarVar | ScalarConstraint): 

230 # deactivate "guess" variables: fixed for initialisation 

231 # and unfixed for a control constraint 

232 if isinstance(c, ScalarConstraint): 

233 c.deactivate() 

234 else: 

235 c.unfix() 

236 

237def deactivate_fixed_guesses(guess_vars: list[list[ScalarVar | ScalarConstraint]]): 

238 

239 for c in guess_vars: 

240 deactivate_components(c) 

241 

242 

243def load_initial_guess(component: Component, value: float): 

244 # load the initial guess into the component 

245 if isinstance(component, Var): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 component.value = value 

247 

248def load_initial_guesses(components: IndexedComponent, values: list[float]): 

249 for c, v in zip(components.values(), values): 

250 load_initial_guess(c, v) 

251 

252 

253def fix_block( 

254 block: Block, 

255 properties_schema: PropertiesSchema, 

256 fs: FlowsheetBlock, 

257 block_ctx: "BlockContext", 

258) -> None: 

259 """ 

260 Fix the properties of a block based on the properties schema. 

261 

262 Args: 

263 - block: The block to fix the properties of. 

264 - properties_schema: The schema of the properties to fix. 

265 - fs: Used to store the properties in the properties map. 

266 """ 

267 property_key: str 

268 property_info: PropertySchema 

269 for property_key, property_info in properties_schema.items(): 

270 # Key is e.g "enth_mol" 

271 

272 # TODO: Handle transformers 

273 # indexed_data = extract_indexes( 

274 # property_info.data, 

275 # property_info.unit, 

276 # transformers, 

277 # ) 

278 property_reference = collate_indexes(block, property_key) 

279 

280 for property_value in property_info.data: 

281 discrete_indexes = property_value.discrete_indexes or [] 

282 

283 pv_id = property_value.id 

284 num_discrete_indexes = len(discrete_indexes) 

285 num_property_indexes = property_reference.index_set().dimen if property_reference.is_indexed() else 0 

286 num_continuous_indexes = num_property_indexes - num_discrete_indexes # This is the dimension of the property_value.value ndarray. 

287 

288 # We have the convention that all the continuous indexes come first in property_reference, and then the discrete indexes. 

289 # This is what idaes normally does. 

290 

291 

292 # We need to get a slice to the current set of discrete indexes. 

293 if len(discrete_indexes) == 0: 

294 property_slice = property_reference # no indexes to worry about. 

295 else: 

296 property_slice = property_reference[tuple( 

297 list(slice(None) for _ in range(num_continuous_indexes)) + discrete_indexes 

298 )] 

299 # Now property_slice is only indexed by the continuous indexes. 

300 

301 # get a reference to the variable/expression. This will also add an index [None] if it is not indexed at all, i.e no continuous indexes. 

302 variable_references = Reference(property_slice, ctype=IndexedComponent) #Ctype=IndexedComponent avoids problems with DerivativeVar 

303 

304 add_to_property_map(variable_references, pv_id, fs) 

305 

306 # Because both pyomo and numpy flatten arrays with the last index changing fastest, we can just flatten both the index set and the values, and then iterate through them together. 

307 variable_indexes = list(variable_references.index_set()) 

308 

309 if (len(variable_indexes) == 0): 

310 print(f"Warning: No variables found for {property_key} with indexes {discrete_indexes}. This may be expected in the milk property package, which doesn't have a gas phase.") 

311 continue 

312 

313 variable_transformed = variable_references 

314 

315 

316 if property_value.value is not None: 

317 variable_values = np.array([property_value.value]).flatten() # we put the value in an array to handle the case where it is a scalar. 

318 variable_values_with_units = [attach_unit(v, property_info.unit) for v in variable_values] 

319 variable_values_converted = [ 

320 units.convert(v, get_attached_unit(var)) for v, var in zip(variable_values_with_units, variable_references.values()) 

321 ] 

322 

323 #value = units.convert(property_value.value, get_attached_unit(var)) 

324 if property_value.constraint is not None: 

325 # add the constraint to the list of constraints to be added 

326 # at the flowsheet level. The var value should also 

327 # be a guess to maintain the degrees of freedom. 

328 expr = property_value.constraint 

329 fs.constraint_exprs.append((variable_transformed, expr, pv_id)) 

330 if property_value.controlled: 

331 # this is a set point. To maintain degrees of freedom, 

332 # use the manipulated variable as a guess during initialisation. 

333 # this is a weird case because we might have guesses for both the 

334 # manipulated variable and this variable, and we want to end up 

335 # using the formula/constraint. If we were to try use this guess 

336 # by eliminating the manipulated variable guess, we would run into 

337 # degrees of freedom issues at the flowsheet level if elimination 

338 # failed (since it then adds another flowsheet level constraint). 

339 load_initial_guesses(variable_references, variable_values_converted) 

340 else: 

341 # not a set point, use this guess during initialization. 

342 component = fix_slice(variable_references, variable_values_converted) 

343 fs.guess_vars.append(component) 

344 elif property_value.controlled is not None: 

345 # this value is a controlling variable, so don't fix it now 

346 # it should be fixed after initialisation 

347 block_ctx.add_controlled_var(variable_references, variable_values_converted, pv_id, property_value.controlled) 

348 elif property_value.guess: 

349 block_ctx.add_guess_var( variable_references, variable_values_converted, pv_id) 

350 else: 

351 components = fix_slice(variable_references, variable_values_converted) 

352 # add_corresponding_constraint used to take the constraint returned by constrain_component if you are constraining an expression. TODO: Do we need to add this back in? 

353 # from c = fix_var() 

354 add_corresponding_constraint(fs, components, pv_id) 

355 

356 

357 

358from ahuora_builder.methods.BlockContext import BlockContext