Coverage for backend/django/idaes_factory/idaes_factory.py: 97%

170 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import traceback 

2 

3from opentelemetry import trace 

4from CoreRoot import settings 

5from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema 

6import dotenv 

7from typing import Any 

8from django.db import transaction 

9from ahuora_builder_types.scenario_schema import UnfixedVariableSchema, OptimizationSchema 

10from core.auxiliary.models.Scenario import Scenario, OptimizationDegreesOfFreedom 

11from core.exceptions import DetailedException 

12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

13from ahuora_builder_types import FlowsheetSchema 

14from core.auxiliary.models.PropertyInfo import PropertyInfo 

15from core.auxiliary.models.PropertyValue import PropertyValue 

16from core.auxiliary.models.Solution import Solution 

17from core.auxiliary.enums.unitsLibrary import units_library 

18from .adapters import arc_adapter 

19from .adapters.convert_expression import convert_expression 

20from .idaes_factory_context import IdaesFactoryContext, LiveSolveParams 

21from .queryset_lookup import get_value_object 

22from .unit_conversion import convert_value 

23from idaes_factory.unit_conversion.unit_conversion import can_convert 

24from core.auxiliary.models.Scenario import Scenario, SolverOptionEnum 

25 

26 

27dotenv.load_dotenv() 

28 

29# Todo: replace these with literal types from the Compounds/PP library 

30Compound = str 

31PropertyPackage = str 

32 

33 

34class IdaesFactoryBuildException(DetailedException): 

35 pass 

36 

37 

38tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME) 

39 

40 

41class IdaesFactory: 

42 """ 

43 The IdaesFactory class is the core class for building 

44 a flowsheet (JSON schema) that can be sent to the IDAES 

45 solver, and for storing the results back in the database. 

46 """ 

47 

48 def __init__( 

49 self, 

50 group_id: int, 

51 scenario: Scenario | None = None, 

52 require_variables_fixed: bool = True, 

53 solve_index: int | None = None, 

54 

55 ) -> None: 

56 """Prepare a factory capable of serialising the requested flowsheet. 

57 

58 Args: 

59 group_id: Identifier of the flowsheet to serialise. 

60 scenario: Optional scenario providing solve configuration settings. 

61 require_variables_fixed: Whether adapters should enforce fixed variables. 

62 solve_index: Optional multi-steady-state index to bind to the context. 

63 """ 

64 

65 self.solve_index = solve_index 

66 self.scenario = scenario 

67 

68 if scenario is not None: 

69 is_dynamic = scenario.enable_dynamics 

70 step_size = scenario.simulation_length / \ 

71 float(scenario.num_time_steps) 

72 enable_rating = scenario.enable_rating 

73 

74 if scenario.enable_optimization: 

75 # If we are doing optimization, we solve from the root 

76 group_id = scenario.flowsheet.rootGrouping.id 

77 else: 

78 is_dynamic = False 

79 step_size = 1 # Just need a placeholder value. 

80 enable_rating = False 

81 

82 time_steps = list([int(i) * step_size for i in range(0, 

83 scenario.num_time_steps)]) if is_dynamic else [0] 

84 

85 self.flowsheet = FlowsheetSchema( 

86 group_id=group_id, 

87 dynamic=is_dynamic, 

88 time_set=time_steps, 

89 property_packages=[], 

90 unit_models=[], 

91 arcs=[], 

92 expressions=[], 

93 optimizations=[], 

94 is_rating_mode=enable_rating, 

95 disable_initialization=getattr( 

96 scenario, "disable_initialization", False), 

97 skip_initialization_for_units_with_initial_values=getattr( 

98 scenario, 

99 "skip_initialization_for_units_with_initial_values", 

100 False, 

101 ), 

102 solver_option=getattr(scenario, "solver_option", "ipopt"), 

103 ) 

104 

105 # factory context 

106 self.context = IdaesFactoryContext( 

107 group_id, 

108 require_variables_fixed=require_variables_fixed, 

109 solve_index=solve_index, 

110 time_steps=time_steps, 

111 time_step_size=step_size, 

112 scenario=scenario, 

113 ) 

114 

115 # Updates the context to use a different solve index. 

116 # build() should be called after this to update the extracted flowsheet data. 

117 def use_with_solve_index(self, solve_index: int) -> None: 

118 """Rebind the factory to a different multi steady-state solve index. 

119 

120 Args: 

121 solve_index: Index of the solve configuration within the scenario. 

122 """ 

123 self.solve_index = solve_index 

124 self.context.update_solve_index(self.solve_index) 

125 

126 @tracer.start_as_current_span("build_flowsheet") 

127 def build(self): 

128 """Populate the flowsheet schema with units, arcs, expressions, and metadata. 

129 

130 Raises: 

131 IdaesFactoryBuildException: If any adapter fails during serialisation. 

132 """ 

133 try: 

134 self.setup_unit_models() 

135 self.create_arcs() 

136 self.add_property_packages() 

137 self.add_expressions() 

138 self.add_optimizations() 

139 self.check_dependencies() 

140 except Exception as e: 

141 raise IdaesFactoryBuildException(e, "idaes_factory_build") from e 

142 

143 def clear_flowsheet(self) -> None: 

144 """Reset the in-memory flowsheet while preserving configuration metadata.""" 

145 self.flowsheet = FlowsheetSchema( 

146 group_id=self.flowsheet.group_id, 

147 dynamic=self.flowsheet.dynamic, 

148 time_set=self.flowsheet.time_set, 

149 property_packages=[], 

150 unit_models=[], 

151 arcs=[], 

152 expressions=[], 

153 optimizations=[], 

154 is_rating_mode=self.flowsheet.is_rating_mode, 

155 disable_initialization=self.flowsheet.disable_initialization, 

156 skip_initialization_for_units_with_initial_values=( 

157 self.flowsheet.skip_initialization_for_units_with_initial_values 

158 ), 

159 solver_option=self.flowsheet.solver_option 

160 ) 

161 

162 def add_property_packages(self) -> None: 

163 """Attach any property packages collected during context loading.""" 

164 self.flowsheet.property_packages = self.context.property_packages 

165 

166 def setup_unit_models(self): 

167 """Serialise all unit operations.""" 

168 # add all unit models 

169 exclude = {"stream", "group", "recycle", "specificationBlock", 

170 "energy_stream", "ac_stream", "humid_air_stream", "transformer_stream"} 

171 for unit_model in self.context.exclude_object_type(exclude): 

172 self.add_unit_model(unit_model) 

173 

174 def add_unit_model(self, unit_model: SimulationObject) -> None: 

175 """Serialise and append a unit model using its registered adapter. 

176 

177 Args: 

178 unit_model: Simulation object to convert into IDAES schema. 

179 

180 Raises: 

181 Exception: If the adapter fails to serialise the unit model. 

182 """ 

183 try: 

184 adapter = unit_model.schema.idaes_adapter 

185 if adapter is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 raise ValueError( 

187 f"No IDAES adapter registered for object type {unit_model.objectType}" 

188 ) 

189 schema = adapter.serialise(self.context, unit_model) 

190 self.flowsheet.unit_models.append(schema) 

191 except Exception as e: 

192 raise Exception( 

193 f"Error adding unit model {unit_model.componentName} to the flowsheet: {e}" 

194 ) 

195 

196 def add_expressions(self) -> None: 

197 """Collect custom property expressions and expose them on the flowsheet.""" 

198 # expressions are stored in the property set of a group 

199 # eg. the global base flowsheet object 

200 simulation_object: SimulationObject 

201 for simulation_object in self.context.exclude_object_type({"machineLearningBlock"}): 

202 # skip machine learning blocks, their properties are handled differently. We still need to support them in future. 

203 

204 properties = simulation_object.properties 

205 prop: PropertyInfo 

206 for prop in properties.ContainedProperties.all(): 

207 if prop.key in simulation_object.schema.properties: 

208 # This is a default property, we have already processed it. 

209 # We only want to capture custom properties 

210 continue 

211 property_value = get_value_object(prop) 

212 self.context.add_property_value_dependency(property_value) 

213 self.flowsheet.expressions.append( 

214 { 

215 "id": property_value.id, 

216 "name": prop.displayName, 

217 "expression": convert_expression(property_value.formula), 

218 } 

219 ) 

220 

221 def add_optimizations(self) -> None: 

222 """Serialise scenario-level optimisation settings onto the flowsheet.""" 

223 # This method was originally written to return multiple optimisations. 

224 # this doesn't make sense, but idaes_service hasn't been updated to only expect one. 

225 # so for now, it sets optimisations to an array with one item 

226 optimization = self.context.scenario 

227 if optimization is None or optimization.enable_optimization is False: 

228 # no optimization to add 

229 return 

230 sense = "minimize" if optimization.minimize else "maximize" 

231 print(optimization.objective) 

232 if optimization.objective is None: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 raise ValueError( 

234 "Please set an objective for the optimization to minimize or maximize.") 

235 objective = get_value_object(optimization.objective) 

236 

237 degrees_of_freedom = [] 

238 degree_of_freedom: OptimizationDegreesOfFreedom 

239 for degree_of_freedom in optimization.degreesOfFreedom.all(): 

240 property_value_id = degree_of_freedom.propertyValue_id 

241 

242 dof_schema = UnfixedVariableSchema( 

243 id=property_value_id, 

244 lower_bound=degree_of_freedom.lower_bound, 

245 upper_bound=degree_of_freedom.upper_bound 

246 ) 

247 degrees_of_freedom.append(dof_schema) 

248 

249 self.flowsheet.optimizations.append(OptimizationSchema( 

250 objective=objective.id, 

251 sense=sense, 

252 unfixed_variables=degrees_of_freedom, 

253 )) 

254 

255 def check_dependencies(self) -> None: 

256 """Verify that all property value dependencies are serialised""" 

257 if self.scenario is not None and self.scenario.enable_optimization is True: 

258 # No need to check since we are serialising everything 

259 return 

260 serialised_property_values = self.context.serialised_property_values 

261 for dependency, prop_values in self.context.property_value_dependencies.items(): 

262 if dependency not in serialised_property_values: 

263 dependency_prop_value = PropertyValue.objects.get(id=dependency) 

264 prop_info_list_str = ", ".join([f"{prop_value.get_simulation_object().componentName}/{prop_value.property.displayName}" for prop_value in prop_values]) 

265 raise Exception(f"Dependency property {dependency_prop_value.get_simulation_object().componentName}/{dependency_prop_value.property.displayName} is not serialised, but is required by properties {prop_info_list_str}.") 

266 

267 def create_arcs(self): 

268 """Serialise stream-like objects into arc connections for the flowsheet.""" 

269 streams = self.context.filter_object_type( 

270 {"stream", "energy_stream", "ac_stream", "humid_air_stream"}) 

271 for stream in streams: 

272 arc_schema = arc_adapter.create_arc(self.context, stream) 

273 

274 if arc_schema is not None: 

275 self.flowsheet.arcs.append(arc_schema) 

276 

277 

278# noinspection PyUnreachableCode 

279def store_properties_schema( 

280 properties_schema: list[SolvedPropertyValueSchema] | None, 

281 flowsheet_id: int, 

282 scenario_id: int | None = None, 

283 solve_index: int | None = None 

284) -> None: 

285 """Persist solved property values and dynamic results to the database. 

286 

287 Args: 

288 properties_schema: Collection of property payloads returned by IDAES. 

289 flowsheet_id: Identifier of the flowsheet whose properties were solved. 

290 scenario_id: Optional scenario identifier associated with the solve. 

291 solve_index: Multi-steady-state index for the stored values, if any. 

292 """ 

293 if not properties_schema: 

294 return 

295 properties_schema = [ 

296 SolvedPropertyValueSchema(**prop) if isinstance(prop, dict) else prop 

297 for prop in properties_schema 

298 ] 

299 # create a id->property map 

300 ids = [prop.id for prop in properties_schema] 

301 property_values = PropertyValue.objects.filter(id__in=ids).select_related( 

302 "property" 

303 ) 

304 prop_map = {prop.id: prop for prop in property_values} 

305 

306 property_values = [] 

307 property_infos = [] 

308 dynamic_results = [] 

309 

310 for prop_schema in properties_schema: 

311 prop = prop_map.get(prop_schema.id, None) 

312 if prop is None: 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true

313 raise Exception( 

314 f"Property {prop_schema.id} not found in the database.") 

315 

316 property_info: PropertyInfo = prop.property 

317 updated_value = prop_schema.value 

318 

319 updated_value = prop_schema.value 

320 

321 from_unit = prop_schema.unit 

322 

323 is_multi_steady_state = solve_index is not None 

324 is_dynamics = scenario_id != None and isinstance( 

325 updated_value, list) and len(updated_value) > 1 

326 

327 # If we're doing MSS or dynamics, we need to create associated dynamic results. 

328 if is_multi_steady_state or is_dynamics: 

329 dynamic_result = Solution( 

330 property=prop, 

331 flowsheet_id=flowsheet_id, 

332 solve_index=solve_index, 

333 scenario_id=scenario_id 

334 ) 

335 

336 # If we're dealing with dynamics, the updated_value is a list of values. 

337 # Otherwise, it's a single scalar value. 

338 dynamic_result.values = (updated_value if isinstance(updated_value, list) 

339 else [updated_value]) 

340 dynamic_results.append(dynamic_result) 

341 continue 

342 

343 # else we can continue as normal 

344 if prop_schema.unknown_units and not can_convert( 

345 from_unit, property_info.unit 

346 ): 

347 # we don't know the category of unit_type this unit is in! 

348 # default to "unknown" with a custom unit 

349 property_info.unitType = "unknown" 

350 property_info.unit = from_unit 

351 to_unit = from_unit 

352 # try to find the unit_type by looping through all 

353 # the units library and checking the first unit in the unit_type 

354 # to see if it can be converted 

355 for unit_type in units_library.keys(): 

356 default_unit = units_library[unit_type][0]["value"] 

357 if can_convert(from_unit, default_unit): 

358 # update the unitType 

359 property_info.unitType = unit_type 

360 property_info.unit = default_unit 

361 to_unit = default_unit 

362 break 

363 property_infos.append(property_info) 

364 else: 

365 to_unit = property_info.unit 

366 

367 # TODO: better handling of multi-dimensional indexed properties. 

368 if isinstance(updated_value, list): 

369 val = updated_value[0] 

370 else: 

371 val = updated_value 

372 

373 new_value = convert_value(val, from_unit=from_unit, to_unit=to_unit) 

374 prop.value = new_value 

375 property_values.append(prop) 

376 

377 with transaction.atomic(): 

378 # save the property values 

379 PropertyValue.objects.bulk_update(property_values, ["value"]) 

380 Solution.objects.bulk_create( 

381 dynamic_results, 

382 update_conflicts=True, 

383 update_fields=["values"], 

384 unique_fields=["pk"], 

385 ) 

386 

387 # save the property infos 

388 PropertyInfo.objects.bulk_update(property_infos, ["unitType", "unit"]) 

389 

390 

391def save_all_initial_values(unit_models: dict[str, Any]) -> None: 

392 """Persist initial values returned from IDAES for each unit model. 

393 

394 Args: 

395 unit_models: Mapping of unit model ids to serialised initial value payloads. 

396 """ 

397 simulation_objects = {unit_op.id: unit_op for unit_op in (SimulationObject.objects 

398 .filter(id__in=unit_models.keys()) 

399 .only("id", "initial_values") 

400 )} 

401 

402 for unit_model_id, unit_model in unit_models.items(): 

403 simulation_object = simulation_objects[int(unit_model_id)] 

404 initial_values = unit_model 

405 simulation_object.initial_values = initial_values 

406 

407 SimulationObject.objects.bulk_update( 

408 simulation_objects.values(), ["initial_values"])