Coverage for backend/idaes_factory/idaes_factory.py: 88%

154 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import traceback 

2 

3from opentelemetry import trace 

4from CoreRoot import settings 

5from common.models.idaes.unit_model_schema import SolvedPropertyValueSchema 

6import dotenv 

7from typing import Any 

8from django.db import transaction 

9from common.models.idaes.scenario_schema import UnfixedVariableSchema, OptimizationSchema 

10from core.auxiliary.models.Scenario import Scenario, OptimizationDegreesOfFreedom 

11from core.exceptions import DetailedException 

12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

13from common.models.idaes import FlowsheetSchema 

14from core.auxiliary.models.PropertyInfo import PropertyInfo 

15from core.auxiliary.models.PropertyValue import PropertyValue 

16from core.auxiliary.models.Solution import Solution 

17from core.auxiliary.enums.unitsLibrary import units_library 

18from .adapters import arc_adapter 

19from .adapters.convert_expression import convert_expression 

20from .idaes_factory_context import IdaesFactoryContext, LiveSolveParams 

21from .queryset_lookup import get_value_object 

22from .unit_conversion import convert_value 

23from idaes_factory.unit_conversion.unit_conversion import can_convert 

24from idaes_factory.adapters.adapter_library import adapters 

25from core.auxiliary.models.Scenario import Scenario, SolverOptionEnum 

26 

27 

28dotenv.load_dotenv() 

29 

30# Todo: replace these with literal types from the Compounds/PP library 

31Compound = str 

32PropertyPackage = str 

33 

34 

35class IdaesFactoryBuildException(DetailedException): 

36 pass 

37 

38tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME) 

39 

40class IdaesFactory: 

41 """ 

42 The IdaesFactory class is the core class for building 

43 a flowsheet (JSON schema) that can be sent to the IDAES 

44 solver, and for storing the results back in the database. 

45 """ 

46 

47 def __init__( 

48 self, flowsheet_id: int, 

49 scenario: Scenario = None, 

50 require_variables_fixed: bool = True, 

51 solve_index: int | None = None, 

52 

53 ) -> None: 

54 """Prepare a factory capable of serialising the requested flowsheet. 

55 

56 Args: 

57 flowsheet_id: Identifier of the flowsheet to serialise. 

58 scenario: Optional scenario providing solve configuration settings. 

59 require_variables_fixed: Whether adapters should enforce fixed variables. 

60 solve_index: Optional multi-steady-state index to bind to the context. 

61 """ 

62 

63 self.solve_index = solve_index 

64 self.scenario = scenario 

65 

66 if scenario is not None: 

67 is_dynamic = scenario.enable_dynamics 

68 step_size = scenario.simulation_length / float(scenario.num_time_steps) 

69 enable_rating = scenario.enable_rating 

70 else: 

71 is_dynamic = False 

72 step_size = 1 # Just need a placeholder value. 

73 enable_rating = False 

74 

75 time_steps = list([ int(i) * step_size for i in range(0,scenario.num_time_steps)]) if is_dynamic else [0] 

76 

77 self.flowsheet = FlowsheetSchema( 

78 id=flowsheet_id, 

79 dynamic=is_dynamic, 

80 time_set=time_steps, 

81 property_packages=[], 

82 unit_models=[], 

83 arcs=[], 

84 expressions=[], 

85 optimizations=[], 

86 is_rating_mode=enable_rating, 

87 disable_initialization=getattr(scenario, "disable_initialization", False), 

88 solver_option = getattr(scenario, "solver_option", "ipopt"), 

89 ) 

90 

91 # factory context 

92 self.context = IdaesFactoryContext( 

93 flowsheet_id, 

94 require_variables_fixed=require_variables_fixed, 

95 solve_index=solve_index, 

96 time_steps=time_steps, 

97 time_step_size=step_size, 

98 scenario=scenario, 

99 ) 

100 

101 # Updates the context to use a different solve index. 

102 # build() should be called after this to update the extracted flowsheet data. 

103 def use_with_solve_index(self, solve_index: int) -> None: 

104 """Rebind the factory to a different multi steady-state solve index. 

105 

106 Args: 

107 solve_index: Index of the solve configuration within the scenario. 

108 """ 

109 self.solve_index = solve_index 

110 self.context.update_solve_index(self.solve_index) 

111 

112 @tracer.start_as_current_span("build_flowsheet") 

113 def build(self): 

114 """Populate the flowsheet schema with units, arcs, expressions, and metadata. 

115 

116 Raises: 

117 IdaesFactoryBuildException: If any adapter fails during serialisation. 

118 """ 

119 try: 

120 self.setup_unit_models() 

121 self.create_arcs() 

122 self.add_property_packages() 

123 self.add_expressions() 

124 self.add_optimizations() 

125 except Exception as e: 

126 raise IdaesFactoryBuildException(e, "idaes_factory_build") from e 

127 

128 def clear_flowsheet(self) -> None: 

129 """Reset the in-memory flowsheet while preserving configuration metadata.""" 

130 self.flowsheet = FlowsheetSchema( 

131 id=self.flowsheet.id, 

132 dynamic=self.flowsheet.dynamic, 

133 time_set=self.flowsheet.time_set, 

134 property_packages=[], 

135 unit_models=[], 

136 arcs=[], 

137 expressions=[], 

138 optimizations=[], 

139 is_rating_mode=self.flowsheet.is_rating_mode, 

140 disable_initialization=self.flowsheet.disable_initialization, 

141 solver_option=self.flowsheet.solver_option 

142 ) 

143 

144 def add_property_packages(self) -> None: 

145 """Attach any property packages collected during context loading.""" 

146 self.flowsheet.property_packages = self.context.property_packages 

147 

148 def setup_unit_models(self): 

149 """Serialise all unit operations.""" 

150 # add all unit models 

151 exclude = {"stream", "group", "recycle", "specificationBlock", "energy_stream","ac_stream", "humid_air_stream", "transformer_stream"} 

152 for unit_model in self.context.exclude_object_type(exclude): 

153 self.add_unit_model(unit_model) 

154 

155 def add_unit_model(self, unit_model: SimulationObject) -> None: 

156 """Serialise and append a unit model using its registered adapter. 

157 

158 Args: 

159 unit_model: Simulation object to convert into IDAES schema. 

160 

161 Raises: 

162 Exception: If the adapter fails to serialise the unit model. 

163 """ 

164 try: 

165 adapter = adapters[unit_model.objectType] 

166 schema = adapter.serialise(self.context, unit_model) 

167 self.flowsheet.unit_models.append(schema) 

168 except Exception as e: 

169 raise Exception( 

170 f"Error adding unit model {unit_model.componentName} to the flowsheet: {e}" 

171 ) 

172 

173 def add_expressions(self) -> None: 

174 """Collect custom property expressions and expose them on the flowsheet.""" 

175 # expressions are stored in the property set of a group 

176 # eg. the global base flowsheet object 

177 simulation_object: SimulationObject 

178 for simulation_object in self.context.exclude_object_type({"machineLearningBlock"}): 

179 # skip machine learning blocks, their properties are handled differently. We still need to support them in future. 

180 

181 properties = simulation_object.properties 

182 prop: PropertyInfo 

183 for prop in properties.ContainedProperties.all(): 

184 if prop.key in simulation_object.schema.properties: 

185 # This is a default property, we have already processed it. 

186 # We only want to capture custom properties 

187 continue 

188 property_value = get_value_object(prop) 

189 self.flowsheet.expressions.append( 

190 { 

191 "id": property_value.id, 

192 "name": prop.displayName, 

193 "expression": convert_expression(property_value.formula), 

194 } 

195 ) 

196 

197 def add_optimizations(self) -> None: 

198 """Serialise scenario-level optimisation settings onto the flowsheet.""" 

199 # This method was originally written to return multiple optimisations. 

200 # this doesn't make sense, but idaes_service hasn't been updated to only expect one. 

201 # so for now, it sets optimisations to an array with one item 

202 optimization = self.context.scenario 

203 if optimization is None or optimization.enable_optimization is False: 203 ↛ 206line 203 didn't jump to line 206 because the condition on line 203 was always true

204 # no optimization to add 

205 return 

206 sense = "minimize" if optimization.minimize else "maximize" 

207 print(optimization.objective) 

208 if optimization.objective is None: 

209 raise ValueError("Please set an objective for the optimization to minimize or maximize.") 

210 objective = get_value_object(optimization.objective) 

211 

212 degrees_of_freedom = [] 

213 degree_of_freedom: OptimizationDegreesOfFreedom 

214 for degree_of_freedom in optimization.degreesOfFreedom.all(): 

215 property_value_id = degree_of_freedom.propertyValue_id 

216 

217 dof_schema = UnfixedVariableSchema( 

218 id=property_value_id, 

219 lower_bound=degree_of_freedom.lower_bound, 

220 upper_bound=degree_of_freedom.upper_bound 

221 ) 

222 degrees_of_freedom.append(dof_schema) 

223 

224 self.flowsheet.optimizations.append(OptimizationSchema( 

225 objective=objective.id, 

226 sense=sense, 

227 unfixed_variables=degrees_of_freedom, 

228 )) 

229 

230 def create_arcs(self): 

231 """Serialise stream-like objects into arc connections for the flowsheet.""" 

232 streams = self.context.filter_object_type({"stream", "energy_stream","ac_stream", "humid_air_stream"}) 

233 for stream in streams: 

234 arc_schema = arc_adapter.create_arc(self.context, stream) 

235 

236 if arc_schema is not None: 

237 self.flowsheet.arcs.append(arc_schema) 

238 

239 

240# noinspection PyUnreachableCode 

241def store_properties_schema( 

242 properties_schema: list[SolvedPropertyValueSchema], 

243 flowsheet_id: int, 

244 scenario_id: int | None = None, 

245 solve_index: int | None = None 

246) -> None: 

247 """Persist solved property values and dynamic results to the database. 

248 

249 Args: 

250 properties_schema: Collection of property payloads returned by IDAES. 

251 flowsheet_id: Identifier of the flowsheet whose properties were solved. 

252 scenario_id: Optional scenario identifier associated with the solve. 

253 solve_index: Multi-steady-state index for the stored values, if any. 

254 """ 

255 properties_schema = [ 

256 SolvedPropertyValueSchema(**prop) if isinstance(prop, dict) else prop 

257 for prop in properties_schema 

258 ] 

259 # create a id->property map 

260 ids = [prop.id for prop in properties_schema] 

261 property_values = PropertyValue.objects.filter(id__in=ids).select_related( 

262 "property" 

263 ) 

264 prop_map = {prop.id: prop for prop in property_values} 

265 

266 property_values = [] 

267 property_infos = [] 

268 dynamic_results = [] 

269 

270 for prop_schema in properties_schema: 

271 prop = prop_map.get(prop_schema.id, None) 

272 if prop is None: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 raise Exception(f"Property {prop_schema.id} not found in the database.") 

274 

275 property_info: PropertyInfo = prop.property 

276 updated_value = prop_schema.value 

277 

278 updated_value = prop_schema.value 

279 

280 from_unit = prop_schema.unit 

281 

282 is_multi_steady_state = solve_index is not None 

283 is_dynamics = scenario_id != None and isinstance(updated_value, list) and len(updated_value) > 1 

284 

285 # If we're doing MSS or dynamics, we need to create associated dynamic results. 

286 if is_multi_steady_state or is_dynamics: 

287 dynamic_result = Solution( 

288 property=prop, 

289 flowsheet_id=flowsheet_id, 

290 solve_index=solve_index, 

291 scenario_id=scenario_id 

292 ) 

293 

294 # If we're dealing with dynamics, the updated_value is a list of values. 

295 # Otherwise, it's a single scalar value. 

296 dynamic_result.values = (updated_value if isinstance(updated_value, list) 

297 else [updated_value]) 

298 dynamic_results.append(dynamic_result) 

299 continue 

300 

301 # else we can continue as normal 

302 if prop_schema.unknown_units and not can_convert( 

303 from_unit, property_info.unit 

304 ): 

305 # we don't know the category of unit_type this unit is in! 

306 # default to "unknown" with a custom unit 

307 property_info.unitType = "unknown" 

308 property_info.unit = from_unit 

309 to_unit = from_unit 

310 # try to find the unit_type by looping through all 

311 # the units library and checking the first unit in the unit_type 

312 # to see if it can be converted 

313 for unit_type in units_library.keys(): 

314 default_unit = units_library[unit_type][0]["value"] 

315 if can_convert(from_unit, default_unit): 

316 # update the unitType 

317 property_info.unitType = unit_type 

318 property_info.unit = default_unit 

319 to_unit = default_unit 

320 break 

321 property_infos.append(property_info) 

322 else: 

323 to_unit = property_info.unit 

324 

325 # TODO: better handling of multi-dimensional indexed properties. 

326 if isinstance(updated_value, list): 

327 val = updated_value[0] 

328 else: 

329 val = updated_value 

330 

331 new_value = convert_value(val, from_unit=from_unit, to_unit=to_unit) 

332 prop.value = new_value 

333 property_values.append(prop) 

334 

335 with transaction.atomic(): 

336 # save the property values 

337 PropertyValue.objects.bulk_update(property_values, ["value"]) 

338 Solution.objects.bulk_create( 

339 dynamic_results, 

340 update_conflicts=True, 

341 update_fields=["values"], 

342 unique_fields=["pk"], 

343 ) 

344 

345 # save the property infos 

346 PropertyInfo.objects.bulk_update(property_infos, ["unitType", "unit"]) 

347 

348 

349def save_all_initial_values(unit_models: dict[str, Any]) -> None: 

350 """Persist initial values returned from IDAES for each unit model. 

351 

352 Args: 

353 unit_models: Mapping of unit model ids to serialised initial value payloads. 

354 """ 

355 simulation_objects = { unit_op.id: unit_op for unit_op in (SimulationObject.objects 

356 .filter(id__in=unit_models.keys()) 

357 .only("id", "initial_values") 

358 )} 

359 

360 for unit_model_id, unit_model in unit_models.items(): 

361 simulation_object = simulation_objects[int(unit_model_id)] 

362 initial_values = unit_model 

363 simulation_object.initial_values = initial_values 

364 

365 SimulationObject.objects.bulk_update(simulation_objects.values(), ["initial_values"])