Coverage for backend/django/idaes_factory/idaes_factory.py: 97%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-26 20:57 +0000

1import traceback 

2 

3from opentelemetry import trace 

4from CoreRoot import settings 

5from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema 

6import dotenv 

7from typing import Any 

8from django.db import transaction 

9from ahuora_builder_types.scenario_schema import UnfixedVariableSchema, OptimizationSchema 

10from core.auxiliary.models.Scenario import Scenario, OptimizationDegreesOfFreedom 

11from core.exceptions import DetailedException 

12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

13from ahuora_builder_types import FlowsheetSchema 

14from core.auxiliary.models.PropertyInfo import PropertyInfo 

15from core.auxiliary.models.PropertyValue import PropertyValue 

16from core.auxiliary.models.Solution import Solution 

17from core.auxiliary.enums.unitsLibrary import units_library 

18from .adapters import arc_adapter 

19from .adapters.convert_expression import convert_expression 

20from .idaes_factory_context import IdaesFactoryContext, LiveSolveParams 

21from .queryset_lookup import get_value_object 

22from .unit_conversion import convert_value 

23from idaes_factory.unit_conversion.unit_conversion import can_convert 

24from idaes_factory.adapters.adapter_library import adapters 

25from core.auxiliary.models.Scenario import Scenario, SolverOptionEnum 

26 

27 

28dotenv.load_dotenv() 

29 

30# Todo: replace these with literal types from the Compounds/PP library 

31Compound = str 

32PropertyPackage = str 

33 

34 

35class IdaesFactoryBuildException(DetailedException): 

36 pass 

37 

38tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME) 

39 

40class IdaesFactory: 

41 """ 

42 The IdaesFactory class is the core class for building 

43 a flowsheet (JSON schema) that can be sent to the IDAES 

44 solver, and for storing the results back in the database. 

45 """ 

46 

47 def __init__( 

48 self, 

49 group_id: int, 

50 scenario: Scenario = None, 

51 require_variables_fixed: bool = True, 

52 solve_index: int | None = None, 

53 

54 ) -> None: 

55 """Prepare a factory capable of serialising the requested flowsheet. 

56 

57 Args: 

58 group_id: Identifier of the flowsheet to serialise. 

59 scenario: Optional scenario providing solve configuration settings. 

60 require_variables_fixed: Whether adapters should enforce fixed variables. 

61 solve_index: Optional multi-steady-state index to bind to the context. 

62 """ 

63 

64 self.solve_index = solve_index 

65 self.scenario = scenario 

66 

67 if scenario is not None: 

68 is_dynamic = scenario.enable_dynamics 

69 step_size = scenario.simulation_length / float(scenario.num_time_steps) 

70 enable_rating = scenario.enable_rating 

71 

72 if scenario.enable_optimization: 

73 # If we are doing optimization, we solve from the root 

74 group_id = scenario.flowsheet.rootGrouping.id 

75 else: 

76 is_dynamic = False 

77 step_size = 1 # Just need a placeholder value. 

78 enable_rating = False 

79 

80 time_steps = list([ int(i) * step_size for i in range(0,scenario.num_time_steps)]) if is_dynamic else [0] 

81 

82 self.flowsheet = FlowsheetSchema( 

83 group_id=group_id, 

84 dynamic=is_dynamic, 

85 time_set=time_steps, 

86 property_packages=[], 

87 unit_models=[], 

88 arcs=[], 

89 expressions=[], 

90 optimizations=[], 

91 is_rating_mode=enable_rating, 

92 disable_initialization=getattr(scenario, "disable_initialization", False), 

93 solver_option = getattr(scenario, "solver_option", "ipopt"), 

94 ) 

95 

96 # factory context 

97 self.context = IdaesFactoryContext( 

98 group_id, 

99 require_variables_fixed=require_variables_fixed, 

100 solve_index=solve_index, 

101 time_steps=time_steps, 

102 time_step_size=step_size, 

103 scenario=scenario, 

104 ) 

105 

106 # Updates the context to use a different solve index. 

107 # build() should be called after this to update the extracted flowsheet data. 

108 def use_with_solve_index(self, solve_index: int) -> None: 

109 """Rebind the factory to a different multi steady-state solve index. 

110 

111 Args: 

112 solve_index: Index of the solve configuration within the scenario. 

113 """ 

114 self.solve_index = solve_index 

115 self.context.update_solve_index(self.solve_index) 

116 

117 @tracer.start_as_current_span("build_flowsheet") 

118 def build(self): 

119 """Populate the flowsheet schema with units, arcs, expressions, and metadata. 

120 

121 Raises: 

122 IdaesFactoryBuildException: If any adapter fails during serialisation. 

123 """ 

124 try: 

125 self.setup_unit_models() 

126 self.create_arcs() 

127 self.add_property_packages() 

128 self.add_expressions() 

129 self.add_optimizations() 

130 self.check_dependencies() 

131 except Exception as e: 

132 raise IdaesFactoryBuildException(e, "idaes_factory_build") from e 

133 

134 def clear_flowsheet(self) -> None: 

135 """Reset the in-memory flowsheet while preserving configuration metadata.""" 

136 self.flowsheet = FlowsheetSchema( 

137 group_id=self.flowsheet.group_id, 

138 dynamic=self.flowsheet.dynamic, 

139 time_set=self.flowsheet.time_set, 

140 property_packages=[], 

141 unit_models=[], 

142 arcs=[], 

143 expressions=[], 

144 optimizations=[], 

145 is_rating_mode=self.flowsheet.is_rating_mode, 

146 disable_initialization=self.flowsheet.disable_initialization, 

147 solver_option=self.flowsheet.solver_option 

148 ) 

149 

150 def add_property_packages(self) -> None: 

151 """Attach any property packages collected during context loading.""" 

152 self.flowsheet.property_packages = self.context.property_packages 

153 

154 def setup_unit_models(self): 

155 """Serialise all unit operations.""" 

156 # add all unit models 

157 exclude = {"stream", "group", "recycle", "specificationBlock", "energy_stream","ac_stream", "humid_air_stream", "transformer_stream"} 

158 for unit_model in self.context.exclude_object_type(exclude): 

159 self.add_unit_model(unit_model) 

160 

161 def add_unit_model(self, unit_model: SimulationObject) -> None: 

162 """Serialise and append a unit model using its registered adapter. 

163 

164 Args: 

165 unit_model: Simulation object to convert into IDAES schema. 

166 

167 Raises: 

168 Exception: If the adapter fails to serialise the unit model. 

169 """ 

170 try: 

171 adapter = adapters[unit_model.objectType] 

172 schema = adapter.serialise(self.context, unit_model) 

173 self.flowsheet.unit_models.append(schema) 

174 except Exception as e: 

175 raise Exception( 

176 f"Error adding unit model {unit_model.componentName} to the flowsheet: {e}" 

177 ) 

178 

179 def add_expressions(self) -> None: 

180 """Collect custom property expressions and expose them on the flowsheet.""" 

181 # expressions are stored in the property set of a group 

182 # eg. the global base flowsheet object 

183 simulation_object: SimulationObject 

184 for simulation_object in self.context.exclude_object_type({"machineLearningBlock"}): 

185 # skip machine learning blocks, their properties are handled differently. We still need to support them in future. 

186 

187 properties = simulation_object.properties 

188 prop: PropertyInfo 

189 for prop in properties.ContainedProperties.all(): 

190 if prop.key in simulation_object.schema.properties: 

191 # This is a default property, we have already processed it. 

192 # We only want to capture custom properties 

193 continue 

194 property_value = get_value_object(prop) 

195 self.context.add_property_value_dependency(property_value) 

196 self.flowsheet.expressions.append( 

197 { 

198 "id": property_value.id, 

199 "name": prop.displayName, 

200 "expression": convert_expression(property_value.formula), 

201 } 

202 ) 

203 

204 def add_optimizations(self) -> None: 

205 """Serialise scenario-level optimisation settings onto the flowsheet.""" 

206 # This method was originally written to return multiple optimisations. 

207 # this doesn't make sense, but idaes_service hasn't been updated to only expect one. 

208 # so for now, it sets optimisations to an array with one item 

209 optimization = self.context.scenario 

210 if optimization is None or optimization.enable_optimization is False: 

211 # no optimization to add 

212 return 

213 sense = "minimize" if optimization.minimize else "maximize" 

214 print(optimization.objective) 

215 if optimization.objective is None: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 raise ValueError("Please set an objective for the optimization to minimize or maximize.") 

217 objective = get_value_object(optimization.objective) 

218 

219 degrees_of_freedom = [] 

220 degree_of_freedom: OptimizationDegreesOfFreedom 

221 for degree_of_freedom in optimization.degreesOfFreedom.all(): 

222 property_value_id = degree_of_freedom.propertyValue_id 

223 

224 dof_schema = UnfixedVariableSchema( 

225 id=property_value_id, 

226 lower_bound=degree_of_freedom.lower_bound, 

227 upper_bound=degree_of_freedom.upper_bound 

228 ) 

229 degrees_of_freedom.append(dof_schema) 

230 

231 self.flowsheet.optimizations.append(OptimizationSchema( 

232 objective=objective.id, 

233 sense=sense, 

234 unfixed_variables=degrees_of_freedom, 

235 )) 

236 

237 def check_dependencies(self) -> None: 

238 """Verify that all property value dependencies are serialised""" 

239 if self.scenario is not None and self.scenario.enable_optimization is True: 

240 # No need to check since we are serialising everything 

241 return 

242 serialised_property_values = self.context.serialised_property_values 

243 for dependency, prop_values in self.context.property_value_dependencies.items(): 

244 if dependency not in serialised_property_values: 

245 dependency_prop_value = PropertyValue.objects.get(id=dependency) 

246 prop_info_list_str = ", ".join([f"{prop_value.get_simulation_object().componentName}/{prop_value.property.displayName}" for prop_value in prop_values]) 

247 raise Exception(f"Dependency property {dependency_prop_value.get_simulation_object().componentName}/{dependency_prop_value.property.displayName} is not serialised, but is required by properties {prop_info_list_str}.") 

248 

249 def create_arcs(self): 

250 """Serialise stream-like objects into arc connections for the flowsheet.""" 

251 streams = self.context.filter_object_type({"stream", "energy_stream","ac_stream", "humid_air_stream"}) 

252 for stream in streams: 

253 arc_schema = arc_adapter.create_arc(self.context, stream) 

254 

255 if arc_schema is not None: 

256 self.flowsheet.arcs.append(arc_schema) 

257 

258 

259# noinspection PyUnreachableCode 

260def store_properties_schema( 

261 properties_schema: list[SolvedPropertyValueSchema], 

262 flowsheet_id: int, 

263 scenario_id: int | None = None, 

264 solve_index: int | None = None 

265) -> None: 

266 """Persist solved property values and dynamic results to the database. 

267 

268 Args: 

269 properties_schema: Collection of property payloads returned by IDAES. 

270 flowsheet_id: Identifier of the flowsheet whose properties were solved. 

271 scenario_id: Optional scenario identifier associated with the solve. 

272 solve_index: Multi-steady-state index for the stored values, if any. 

273 """ 

274 properties_schema = [ 

275 SolvedPropertyValueSchema(**prop) if isinstance(prop, dict) else prop 

276 for prop in properties_schema 

277 ] 

278 # create a id->property map 

279 ids = [prop.id for prop in properties_schema] 

280 property_values = PropertyValue.objects.filter(id__in=ids).select_related( 

281 "property" 

282 ) 

283 prop_map = {prop.id: prop for prop in property_values} 

284 

285 property_values = [] 

286 property_infos = [] 

287 dynamic_results = [] 

288 

289 for prop_schema in properties_schema: 

290 prop = prop_map.get(prop_schema.id, None) 

291 if prop is None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 raise Exception(f"Property {prop_schema.id} not found in the database.") 

293 

294 property_info: PropertyInfo = prop.property 

295 updated_value = prop_schema.value 

296 

297 updated_value = prop_schema.value 

298 

299 from_unit = prop_schema.unit 

300 

301 is_multi_steady_state = solve_index is not None 

302 is_dynamics = scenario_id != None and isinstance(updated_value, list) and len(updated_value) > 1 

303 

304 # If we're doing MSS or dynamics, we need to create associated dynamic results. 

305 if is_multi_steady_state or is_dynamics: 

306 dynamic_result = Solution( 

307 property=prop, 

308 flowsheet_id=flowsheet_id, 

309 solve_index=solve_index, 

310 scenario_id=scenario_id 

311 ) 

312 

313 # If we're dealing with dynamics, the updated_value is a list of values. 

314 # Otherwise, it's a single scalar value. 

315 dynamic_result.values = (updated_value if isinstance(updated_value, list) 

316 else [updated_value]) 

317 dynamic_results.append(dynamic_result) 

318 continue 

319 

320 # else we can continue as normal 

321 if prop_schema.unknown_units and not can_convert( 

322 from_unit, property_info.unit 

323 ): 

324 # we don't know the category of unit_type this unit is in! 

325 # default to "unknown" with a custom unit 

326 property_info.unitType = "unknown" 

327 property_info.unit = from_unit 

328 to_unit = from_unit 

329 # try to find the unit_type by looping through all 

330 # the units library and checking the first unit in the unit_type 

331 # to see if it can be converted 

332 for unit_type in units_library.keys(): 

333 default_unit = units_library[unit_type][0]["value"] 

334 if can_convert(from_unit, default_unit): 

335 # update the unitType 

336 property_info.unitType = unit_type 

337 property_info.unit = default_unit 

338 to_unit = default_unit 

339 break 

340 property_infos.append(property_info) 

341 else: 

342 to_unit = property_info.unit 

343 

344 # TODO: better handling of multi-dimensional indexed properties. 

345 if isinstance(updated_value, list): 

346 val = updated_value[0] 

347 else: 

348 val = updated_value 

349 

350 new_value = convert_value(val, from_unit=from_unit, to_unit=to_unit) 

351 prop.value = new_value 

352 property_values.append(prop) 

353 

354 with transaction.atomic(): 

355 # save the property values 

356 PropertyValue.objects.bulk_update(property_values, ["value"]) 

357 Solution.objects.bulk_create( 

358 dynamic_results, 

359 update_conflicts=True, 

360 update_fields=["values"], 

361 unique_fields=["pk"], 

362 ) 

363 

364 # save the property infos 

365 PropertyInfo.objects.bulk_update(property_infos, ["unitType", "unit"]) 

366 

367 

368def save_all_initial_values(unit_models: dict[str, Any]) -> None: 

369 """Persist initial values returned from IDAES for each unit model. 

370 

371 Args: 

372 unit_models: Mapping of unit model ids to serialised initial value payloads. 

373 """ 

374 simulation_objects = { unit_op.id: unit_op for unit_op in (SimulationObject.objects 

375 .filter(id__in=unit_models.keys()) 

376 .only("id", "initial_values") 

377 )} 

378 

379 for unit_model_id, unit_model in unit_models.items(): 

380 simulation_object = simulation_objects[int(unit_model_id)] 

381 initial_values = unit_model 

382 simulation_object.initial_values = initial_values 

383 

384 SimulationObject.objects.bulk_update(simulation_objects.values(), ["initial_values"])