Coverage for backend/ahuora-builder/src/ahuora_builder/custom/thermal_utility_systems/header.py: 84%

324 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Header unit model. 

2Implementation by Tim and Keegan, taken from Ahuora-UnitOperations repo""" 

3 

4from typing import List, Optional, Iterable 

5 

6from pyomo.common.config import Bool, ConfigBlock, ConfigValue, In 

7import pyomo.environ as pyo 

8from pyomo.environ import ( 

9 Constraint, 

10 Expression, 

11 Param, 

12 PositiveReals, 

13 RangeSet, 

14 Suffix, 

15 Var, 

16 check_optimal_termination, 

17 value, 

18 units as pyunits, 

19) 

20from pyomo.core.base.reference import Reference 

21 

22from idaes.core import StateBlock, UnitModelBlockData, declare_process_block_class, useDefault 

23from idaes.core.initialization import ModularInitializerBase 

24from idaes.core.solvers import get_solver 

25from idaes.core.util import scaling as iscale 

26from idaes.core.scaling import CustomScalerBase, ConstraintScalingScheme 

27from idaes.core.util.config import is_physical_parameter_block 

28from idaes.core.util.exceptions import InitializationError 

29from idaes.core.util.math import smooth_max, smooth_min 

30from idaes.core.util.model_diagnostics import DiagnosticsToolbox 

31from idaes.core.util.model_statistics import degrees_of_freedom, report_statistics 

32from idaes.core.util.tables import create_stream_table_dataframe 

33 

34import idaes.logger as idaeslog 

35 

36_log = idaeslog.getLogger(__name__) 

37 

38__author__ = "Ahuora Centre for Smart Energy Systems, University of Waikato, New Zealand" 

39 

40 

41def _build_config(config: ConfigBlock) -> None: 

42 """Declare configuration options for the Header unit. 

43 

44 Declares property package references and integer counts for inlets and outlets. 

45 

46 Args: 

47 config (ConfigBlock): The mutable configuration block to populate. 

48 

49 Raises: 

50 ValueError: If invalid option values are provided by the caller (via IDAES). 

51 """ 

52 

53 config.declare( 

54 "property_package", 

55 ConfigValue( 

56 default=useDefault, 

57 domain=is_physical_parameter_block, 

58 description="Property package to use for control volume", 

59 ), 

60 ) 

61 config.declare( 

62 "property_package_args", 

63 ConfigBlock( 

64 implicit=True, 

65 description="Arguments to use for constructing property packages", 

66 ), 

67 ) 

68 config.declare( 

69 "num_inlets", 

70 ConfigValue( 

71 default=1, 

72 domain=In(list(range(1, 100))), 

73 description="Number of utility providers at inlets.", 

74 ), 

75 ) 

76 config.declare( 

77 "num_outlets", 

78 ConfigValue( 

79 default=1, 

80 domain=In(list(range(0, 100))), 

81 description=( 

82 "Number of utility users at outlets. Excludes outlets " 

83 "associated with condensate and vent flows." 

84 ), 

85 ), 

86 ) 

87 config.declare( 

88 "is_liquid_header", 

89 ConfigValue( 

90 default=False, 

91 domain=Bool, 

92 description="Flag for selecting liquid or vapour (including steam and other gases).", 

93 ), 

94 ) 

95 

96 

97@declare_process_block_class("simple_header") 

98class SimpleHeaderData(UnitModelBlockData): 

99 """Thermal utility header unit operation.""" 

100 

101 CONFIG = UnitModelBlockData.CONFIG() 

102 _build_config(CONFIG) 

103 

104 def build(self) -> None: 

105 """Build the unit model structure and equations.""" 

106 super().build() 

107 units_meta = self.config.property_package.get_metadata().get_derived_units 

108 

109 if self.config.num_inlets < 1: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 raise ValueError("Header requires at least one provider (num_inlets >= 1).") 

111 if self.config.num_outlets < 1: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 raise ValueError("Header requires at least one user (num_outlets >= 1).") 

113 

114 self.inlet_list = [f"inlet_{i+1}" for i in range(self.config.num_inlets)] 

115 self.outlet_list = [f"outlet_{i+1}" for i in range(self.config.num_outlets)] + [ 

116 "outlet_condensate", 

117 "outlet_vent", 

118 ] 

119 

120 """ 

121 1. Build inlet, outlet and internal state blocks and associate  

122 with ports (where applicable) 

123 """ 

124 self.inlet_blocks = self._build_state_blocks( 

125 stream_name_list=self.inlet_list, 

126 has_phase_equilibrium=False, 

127 is_defined_state=True, 

128 is_build_port=True, 

129 ) 

130 self._outlet_supply_blocks = self._build_state_blocks( 

131 stream_name_list=[f"outlet_{i+1}" for i in range(self.config.num_outlets)], 

132 has_phase_equilibrium=False, 

133 is_defined_state=False, 

134 is_build_port=True, 

135 ) 

136 self._outlet_vent_blocks = self._build_state_blocks( 

137 stream_name_list=["outlet_condensate", "outlet_vent"], 

138 has_phase_equilibrium=False, 

139 is_defined_state=False, 

140 is_build_port=True, 

141 ) 

142 self.outlet_blocks = self._outlet_supply_blocks + self._outlet_vent_blocks 

143 self.internal_blocks = self._build_state_blocks( 

144 stream_name_list=["mixed"], 

145 has_phase_equilibrium=True, 

146 is_defined_state=False, 

147 is_build_port=False, 

148 ) 

149 for sb in (self.inlet_blocks + self.internal_blocks + self.outlet_blocks): 

150 for t in sb: 

151 sb[t].flow_mol.setlb(0.0) 

152 

153 """ 

154 2. Create parameters, variables, references and expressions 

155 """ 

156 # References 

157 self.total_flow_mol = Reference(self.mixed_state[:].flow_mol) 

158 self.total_flow_mass = Reference(self.mixed_state[:].flow_mass) 

159 self.pressure = Reference(self.mixed_state[:].pressure) 

160 self.temperature = Reference(self.mixed_state[:].temperature) 

161 self.enth_mol = Reference(self.mixed_state[:].enth_mol) 

162 self.enth_mass = Reference(self.mixed_state[:].enth_mass) 

163 self.vapor_frac = Reference(self.mixed_state[:].vapor_frac) 

164 

165 # State variables 

166 self.heat_loss = Var( 

167 self.flowsheet().time, 

168 initialize=0.0, 

169 bounds=(0, None), 

170 units=units_meta("power"), 

171 doc="Heat loss", 

172 ) 

173 self.pressure_loss = Var( 

174 self.flowsheet().time, 

175 initialize=0, 

176 bounds=(0, None), 

177 units=units_meta("pressure"), 

178 doc="Pressure loss.", 

179 ) 

180 

181 # Non-normal state variables, other parameters and expression 

182 if self.config.is_liquid_header: 

183 self._liq_out_enth_mol = Var( 

184 self.flowsheet().time, 

185 initialize=42.0 * 18 * pyunits.J / pyunits.mol, 

186 units=units_meta("energy") / units_meta("amount"), 

187 doc="Molar enthalpy of the liquid outlets", 

188 ) 

189 else: 

190 self._vap_out_enth_mol = Var( 

191 self.flowsheet().time, 

192 initialize=2700.0 * 18 * pyunits.J / pyunits.mol, 

193 units=units_meta("energy") / units_meta("amount"), 

194 doc="Molar enthalpy of the vapour outlets", 

195 ) 

196 inlet_idx = RangeSet(len(self.inlet_blocks)) 

197 self._minimum_pressure = Var( 

198 self.flowsheet().time, 

199 inlet_idx, 

200 doc="Variable for calculating minimum inlet pressure", 

201 units=units_meta("pressure"), 

202 ) 

203 self._eps_pressure = Param( 

204 mutable=True, 

205 initialize=1e-3, 

206 domain=PositiveReals, 

207 doc="Smoothing term for minimum inlet pressure", 

208 units=units_meta("pressure"), 

209 ) 

210 

211 # Expressions 

212 self.degree_of_superheat = Expression( 

213 self.flowsheet().time, 

214 rule=lambda b, t: ( 

215 b.temperature[t] - b.outlet_condensate_state[t].temperature 

216 ), 

217 ) 

218 self._partial_total_flow_mol = Expression( 

219 self.flowsheet().time, 

220 rule=lambda b, t: ( 

221 sum(o[t].flow_mol for o in (b.inlet_blocks + b._outlet_supply_blocks)) 

222 ), 

223 ) 

224 self.balance_flow_mol = Expression( 

225 self.flowsheet().time, 

226 rule=lambda b, t: ( 

227 sum(i[t].flow_mol for i in b.inlet_blocks) 

228 - 

229 sum( 

230 o[t].flow_mol 

231 for o in ( 

232 b._outlet_supply_blocks 

233 + 

234 [ 

235 b.outlet_vent_state 

236 if self.config.is_liquid_header 

237 else b.outlet_condensate_state, 

238 ] 

239 ) 

240 ) 

241 ), 

242 doc="Flow imbalance between inlets and outlets; positive if in excess of supply to outlets.", 

243 ) 

244 self.makeup_flow_mol = Expression( 

245 self.flowsheet().time, 

246 rule=lambda b, t: ( 

247 ( 

248 b.outlet_condensate_state[t].flow_mol 

249 if self.config.is_liquid_header 

250 else b.outlet_vent_state[t].flow_mol 

251 ) 

252 - 

253 b.balance_flow_mol[t] 

254 ), 

255 ) 

256 

257 """ 

258 3. Declare constraints to define mass, energy, and momentum balances,  

259 unit operation performance and other constraint  

260 """ 

261 # a) Material balance equations 

262 @self.Constraint(self.flowsheet().time, doc="Mixed state material balance") 

263 def mixed_state_material_balance(b, t): 

264 return b.mixed_state[t].flow_mol == sum(i[t].flow_mol for i in b.inlet_blocks) 

265 

266 eps_smooth = 1e-5 # smoothing parameter; smaller = closer to exact max, larger = smoother 

267 eps_div = 1e-6 # small number to prevent division by zero 

268 if self.config.is_liquid_header: 

269 # Assigns excess liquid flow to outlet_condensate 

270 @self.Constraint(self.flowsheet().time, doc="Condensate flow balance.") 

271 def condensate_flow_balance(b, t): 

272 return ( 

273 b.outlet_condensate_state[t].flow_mol 

274 == 

275 smooth_max( 

276 b.balance_flow_mol[t] / (b._partial_total_flow_mol[t] + eps_div*pyunits.mol / pyunits.s), 

277 0.0, 

278 eps_smooth, 

279 ) * (b._partial_total_flow_mol[t] + eps_div*pyunits.mol / pyunits.s) 

280 ) 

281 

282 # Removes any gas/vapour from a liquid header 

283 @self.Constraint(self.flowsheet().time, doc="Vent flow balance.") 

284 def vent_flow_balance(b, t): 

285 return b.outlet_vent_state[t].flow_mol == b.mixed_state[t].flow_mol * b.mixed_state[t].vapor_frac 

286 

287 else: 

288 # Assigns excess steam/vapour flow to outlet_vent 

289 @self.Constraint(self.flowsheet().time, doc="Vent flow balance.") 

290 def vent_flow_balance(b, t): 

291 return ( 

292 b.outlet_vent_state[t].flow_mol 

293 == 

294 smooth_max( 

295 b.balance_flow_mol[t] / (b._partial_total_flow_mol[t] + eps_div*pyunits.mol / pyunits.s), 

296 0.0, 

297 eps_smooth, 

298 ) * (b._partial_total_flow_mol[t] + eps_div*pyunits.mol / pyunits.s) 

299 ) 

300 

301 # Removes any condensate/liquid from a steam/gas header 

302 @self.Constraint(self.flowsheet().time, doc="Condensate flow balance.") 

303 def condensate_flow_balance(b, t): 

304 return b.outlet_condensate_state[t].flow_mol == b.mixed_state[t].flow_mol * (1 - b.mixed_state[t].vapor_frac) 

305 

306 # b) Energy balance equations 

307 @self.Constraint(self.flowsheet().time, doc="Energy balance for inlets to mixed state including heat loss") 

308 def inlets_to_mixed_state_energy_balance(b, t): 

309 return ( 

310 b.mixed_state[t].flow_mol * b.mixed_state[t].enth_mol 

311 + b.heat_loss[t] 

312 == sum(i[t].flow_mol * i[t].enth_mol for i in b.inlet_blocks) 

313 ) 

314 

315 @self.Constraint(self.flowsheet().time, doc="Energy balance for mixed state to outlets") 

316 def mixed_state_to_outlets_energy_balance(b, t): 

317 return ( 

318 b.mixed_state[t].enth_mol * sum(o[t].flow_mol for o in b.outlet_blocks) 

319 == 

320 sum(o[t].flow_mol * o[t].enth_mol for o in b.outlet_blocks) 

321 ) 

322 

323 if self.config.is_liquid_header: 

324 self._outlet_blocks_exc_vent = self._outlet_supply_blocks + [self.outlet_condensate_state] 

325 @self.Constraint(self.flowsheet().time, self._outlet_blocks_exc_vent, 

326 doc="All liquid outlets (incl. condensate) share a common liquid enthalpy", 

327 ) 

328 def molar_enthalpy_equality_eqn(b, t, o): 

329 return o[t].enth_mol == b._liq_out_enth_mol[t] 

330 else: 

331 self._outlet_blocks_exc_condensate = self._outlet_supply_blocks + [self.outlet_vent_state] 

332 @self.Constraint(self.flowsheet().time, self._outlet_blocks_exc_condensate, 

333 doc="All vapour outlets (incl. vent) share a common vapour enthalpy", 

334 ) 

335 def molar_enthalpy_equality_eqn(b, t, o): 

336 return o[t].enth_mol == b._vap_out_enth_mol[t] 

337 

338 # c) Momentum balance equations 

339 @self.Constraint(self.flowsheet().time, inlet_idx, 

340 doc="Calculation for minimum inlet pressure", 

341 ) 

342 def minimum_pressure_constraint(b, t, i): 

343 if i == inlet_idx.first(): 

344 return b._minimum_pressure[t, i] == b.inlet_blocks[i - 1][t].pressure 

345 else: 

346 return ( 

347 b._minimum_pressure[t, i] 

348 == 

349 smooth_min( 

350 b._minimum_pressure[t, i - 1], 

351 b.inlet_blocks[i - 1][t].pressure, 

352 b._eps_pressure, 

353 ) 

354 ) 

355 # Set mixed pressure to minimum inlet pressure minus any pressure loss 

356 @self.Constraint(self.flowsheet().time, doc="Pressure equality constraint from minimum inlet to mixed state") 

357 def mixture_pressure(b, t): 

358 return b.mixed_state[t].pressure == ( 

359 b._minimum_pressure[t, inlet_idx.last()] - b.pressure_loss[t] 

360 ) 

361 # Set outlet pressures to mixed pressure 

362 @self.Constraint(self.flowsheet().time, self.outlet_blocks, 

363 doc="Pressure equality constraint from mixed state to outlets", 

364 ) 

365 def pressure_equality_eqn(b, t, o): 

366 return b.mixed_state[t].pressure == o[t].pressure 

367 

368 # d) Additional constraints for vapour/liquid separation and outlet splits 

369 if self.config.is_liquid_header: 

370 @self.Constraint(self.flowsheet().time, doc="Vent vapour fraction.") 

371 def vent_vapour_fraction(b, t): 

372 return b.outlet_vent_state[t].vapor_frac == 1 # 1 - 1e-6 

373 else: 

374 @self.Constraint(self.flowsheet().time, doc="Condensate vapour fraction.") 

375 def condensate_vapour_fraction(b, t): 

376 return b.outlet_condensate_state[t].vapor_frac == 0 # 1e-6 

377 

378 """ 

379 4. Other model components and references  

380 """ 

381 self.scaling_factor = Suffix(direction=Suffix.EXPORT) 

382 self.outlet_idx = pyo.Set(initialize=self.outlet_list) 

383 # Map each (t, o) to the outlet state's flow var 

384 ref_map = {} 

385 for o in self.outlet_list: 

386 if o != "vent": 386 ↛ 385line 386 didn't jump to line 385 because the condition on line 386 was always true

387 outlet_state_block = getattr(self, f"{o}_state") 

388 for t in self.flowsheet().time: 

389 ref_map[(t, o)] = outlet_state_block[t].flow_mol 

390 

391 self.split_flow = Reference(ref_map) 

392 

393 

394 def initialize_build(self, state_args=None, outlvl=idaeslog.NOTSET, solver=None, optarg=None): 

395 """ 

396 General wrapper for template initialization routines 

397 

398 Keyword Arguments: 

399 state_args : a dict of arguments to be passed to the property 

400 package(s) to provide an initial state for 

401 initialization (see documentation of the specific 

402 property package) (default = {}). 

403 outlvl : sets output level of initialization routine 

404 optarg : solver options dictionary object (default=None) 

405 solver : str indicating which solver to use during 

406 initialization (default = None) 

407 

408 Returns: None 

409 """ 

410 init_log = idaeslog.getInitLogger(self.name, outlvl, tag="unit") 

411 solve_log = idaeslog.getSolveLogger(self.name, outlvl, tag="unit") 

412 t0 = self.flowsheet().config.time.first() 

413 

414 opt = get_solver(solver, optarg) 

415 

416 pp = self.mixed_state[t0].params 

417 state_args = {} if state_args is None else dict(state_args) 

418 

419 shared_state_args = { 

420 key: state_args[key] 

421 for key in ("flow_mol", "pressure", "enth_mol", "temperature") 

422 if key in state_args 

423 } 

424 

425 def _state_seed_args(stream_name): 

426 local_args = dict(shared_state_args) 

427 local_args.update(state_args.get(stream_name, {})) 

428 return local_args 

429 

430 inlet_state_args = {name: _state_seed_args(name) for name in self.inlet_list} 

431 outlet_state_args = {name: _state_seed_args(name) for name in self.outlet_list} 

432 mixed_state_args = _state_seed_args("mixed") 

433 

434 # Helper functions 

435 def _value_or_none(obj): # returns the value of a Var or Param if it is fixed, otherwise returns None 

436 return value(obj, exception=False) 

437 

438 def _pick_seed(*candidates): # loops through different options of seeding and picks the first (in order of preference) 

439 for candidate in candidates: 439 ↛ 442line 439 didn't jump to line 442 because the loop on line 439 didn't complete

440 if candidate is not None: 

441 return candidate 

442 return None 

443 

444 def _enthalpy_from_tp(temperature, pressure): 

445 if temperature is None or pressure is None: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true

446 return None 

447 return value(pp.htpx(temperature * pyunits.K, pressure * pyunits.Pa)) 

448 

449 

450 def _safe_nonnegative(val, fallback=0.0): 

451 if val is None: 

452 return fallback 

453 return max(val, 0.0) 

454 

455 #----------------------------------------------------- 

456 # 1. Build state seeds. Prefer explicit state_args, then connected/fixed 

457 # values already present on the unit, then infer from local specs, then 

458 # fall back to generic guesses. 

459 

460 # Inlets flows ranking 

461 # 1. Explicit state_args 

462 # 2. A fixed inlet flow on this unit 

463 # 3. A propagated upstream inlet flow 

464 # 4. Back-calculate from fixed outlet flow(s) + 10% divided evenly among inlets. The 10% is for venting 

465 # 5. Nominal fallback 

466 

467 # First get an estimate of the total inlet based on outlet flows if they were fixed 

468 fixed_outlet_flows = [] 

469 for outlet_sb in self.outlet_blocks: 

470 outlet_flow_fixed = ( 

471 _value_or_none(outlet_sb[t0].flow_mol) 

472 if outlet_sb[t0].flow_mol.fixed 

473 else None 

474 ) 

475 if outlet_flow_fixed is not None: 

476 fixed_outlet_flows.append(outlet_flow_fixed) 

477 

478 total_fixed_outlet_flow = ( 

479 sum(fixed_outlet_flows) if fixed_outlet_flows else None 

480 ) 

481 nominal_total_flow = _pick_seed(total_fixed_outlet_flow, 500.0) 

482 nominal_inlet_flow = nominal_total_flow / max(len(self.inlet_blocks), 1) 

483 

484 # Now build the inlet seeds using the ranking above 

485 inlet_seeds = {} 

486 seeded_inlet_flows = [] 

487 for inlet_name, inlet_sb in zip(self.inlet_list, self.inlet_blocks): 

488 inlet_has_source = len(list(getattr(self, inlet_name).sources())) > 0 

489 local_args = inlet_state_args[inlet_name] 

490 

491 flow_from_fixed = ( 

492 _value_or_none(inlet_sb[t0].flow_mol) 

493 if inlet_sb[t0].flow_mol.fixed 

494 else None 

495 ) 

496 flow_from_upstream = ( 

497 _value_or_none(inlet_sb[t0].flow_mol) 

498 if inlet_has_source 

499 else None 

500 ) 

501 flow_from_fixed_outlets = ( 

502 total_fixed_outlet_flow / max(len(self.inlet_blocks), 1) * 1.1 

503 if total_fixed_outlet_flow is not None 

504 else None 

505 ) 

506 

507 f_in = _pick_seed( 

508 local_args.get("flow_mol"), 

509 flow_from_fixed, 

510 flow_from_upstream, 

511 flow_from_fixed_outlets, 

512 nominal_inlet_flow, 

513 ) 

514 

515 inlet_seeds[inlet_name] = {"flow_mol": f_in} 

516 seeded_inlet_flows.append(f_in) 

517 

518 # Pass these seeded inlet flows to the mixed state and outlet states 

519 f_mixed = sum(seeded_inlet_flows) 

520 

521 # Outlet flow ranking: 

522 # 1. Fixed outlet flow on this unit 

523 # 2. Even split of seeded inlet flow (minus 10% for venting) divided among numbered outlets, vent gets 10%  

524 

525 # First split the total flow into outlet and vent portions. The outlet portion is then divided evenly among the numbered outlets, and the vent portion is assigned to the vent outlet. This is just a starting guess to help initialization, and will be overridden if there are fixed outlet flows that provide a better seed. 

526 numbered_outlet_flow = 0.9 * f_mixed / max(self.config.num_outlets, 1) 

527 vent_flow = 0.1 * f_mixed 

528 

529 # Now build the outlet seeds using the ranking above 

530 outlet_seeds = {} 

531 seeded_outlet_flows = [] 

532 for outlet_name, outlet_sb in zip(self.outlet_list, self.outlet_blocks): 

533 flow_from_fixed = ( 

534 _value_or_none(outlet_sb[t0].flow_mol) 

535 if outlet_sb[t0].flow_mol.fixed 

536 else None 

537 ) 

538 

539 if outlet_name.startswith("outlet_") and outlet_name not in ( 

540 "outlet_condensate", 

541 "outlet_vent", 

542 ): 

543 default_flow = numbered_outlet_flow 

544 elif outlet_name == "outlet_vent": 

545 default_flow = vent_flow 

546 else: 

547 default_flow = 0.0 

548 

549 f_out = _pick_seed( 

550 flow_from_fixed, 

551 default_flow, 

552 ) 

553 

554 outlet_seeds[outlet_name] = {"flow_mol": f_out} 

555 seeded_outlet_flows.append(f_out) 

556 

557 # Inlet pressure ranking 

558 # 1. Explicit state_args 

559 # 2. A propagated upstream inlet pressure 

560 # 3. A fixed inlet pressure on this unit 

561 # 4. Assume all inlet pressures equal to the outlet pressure + pressure loss 

562 # 5. Nominal fallback of 10 bar  

563 

564 # First determine the outlet pressure if it were fixed  

565 pressure_loss = _pick_seed(_value_or_none(self.pressure_loss[t0]), 0.0) 

566 fixed_outlet_pressure = None 

567 for outlet_sb in self.outlet_blocks: 

568 outlet_pressure_fixed = ( 

569 _value_or_none(outlet_sb[t0].pressure) 

570 if outlet_sb[t0].pressure.fixed 

571 else None 

572 ) 

573 if outlet_pressure_fixed is not None: 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true

574 fixed_outlet_pressure = outlet_pressure_fixed 

575 break 

576 

577 inlet_pressure_from_outlet = ( 

578 fixed_outlet_pressure + pressure_loss 

579 if fixed_outlet_pressure is not None 

580 else None 

581 ) 

582 

583 # Now build the inlet pressure seeds using the ranking above 

584 seeded_inlet_pressures = [] 

585 for inlet_name, inlet_sb in zip(self.inlet_list, self.inlet_blocks): 

586 inlet_has_source = len(list(getattr(self, inlet_name).sources())) > 0 

587 local_args = inlet_state_args[inlet_name] 

588 

589 pressure_from_upstream = ( 

590 _value_or_none(inlet_sb[t0].pressure) 

591 if inlet_has_source 

592 else None 

593 ) 

594 pressure_from_fixed = ( 

595 _value_or_none(inlet_sb[t0].pressure) 

596 if inlet_sb[t0].pressure.fixed 

597 else None 

598 ) 

599 

600 p_in = _pick_seed( 

601 local_args.get("pressure"), 

602 pressure_from_upstream, 

603 pressure_from_fixed, 

604 inlet_pressure_from_outlet, 

605 10e5, 

606 ) 

607 

608 inlet_seeds[inlet_name]["pressure"] = p_in 

609 seeded_inlet_pressures.append(p_in) 

610 

611 # Pass the seeded mixed pressure to the outlet states 

612 p_mixed = min(seeded_inlet_pressures) - pressure_loss 

613 for outlet_name, outlet_sb in zip(self.outlet_list, self.outlet_blocks): 

614 outlet_seeds[outlet_name]["pressure"] = p_mixed 

615 

616 # Inlet enthalpy ranking 

617 # 1. Explicit state_args 

618 # 2. A propagated upstream inlet enthalpy 

619 # 3. Fixed inlet enthalpy on this unit 

620 # 4. Back calculate from fixed outlet enthalpies and seeded inlet flows with user heat loss 

621 # 5. Assume inlet enthalpies all equal to the saturation enthalpy + 10 C superheat # TODO: adapt this for water headers 

622 

623 # For rank 4, first determine the outlet enthalpies if they were fixed, and calculate the inlet enthalpy with heat loss 

624 heat_loss = _pick_seed(_value_or_none(self.heat_loss[t0]), 0.0) 

625 inlet_enthalpy_from_outlets = None 

626 if sum(seeded_inlet_flows) > 0: 

627 outlet_energy_terms = [] 

628 all_outlet_enthalpies_fixed = True 

629 for outlet_name, outlet_sb in zip(self.outlet_list, self.outlet_blocks): 629 ↛ 645line 629 didn't jump to line 645 because the loop on line 629 didn't complete

630 outlet_enthalpy_fixed = ( 

631 _value_or_none(outlet_sb[t0].enth_mol) 

632 if outlet_sb[t0].enth_mol.fixed 

633 else None 

634 ) 

635 seeded_outlet_flow = outlet_seeds[outlet_name].get("flow_mol") 

636 

637 if outlet_enthalpy_fixed is None or seeded_outlet_flow is None: 637 ↛ 641line 637 didn't jump to line 641 because the condition on line 637 was always true

638 all_outlet_enthalpies_fixed = False 

639 break 

640 

641 outlet_energy_terms.append( 

642 seeded_outlet_flow * outlet_enthalpy_fixed 

643 ) 

644 

645 if all_outlet_enthalpies_fixed: 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true

646 inlet_enthalpy_from_outlets = ( 

647 sum(outlet_energy_terms) + heat_loss 

648 ) / (sum(seeded_inlet_flows) + 1e-6) 

649 

650 seeded_inlet_enthalpies = [] 

651 for inlet_name, inlet_sb in zip(self.inlet_list, self.inlet_blocks): 

652 inlet_has_source = len(list(getattr(self, inlet_name).sources())) > 0 

653 local_args = inlet_state_args[inlet_name] 

654 p_in = inlet_seeds[inlet_name]["pressure"] 

655 flow_mol = inlet_seeds[inlet_name]["flow_mol"] 

656 enthalpy_from_upstream = ( 

657 _value_or_none(inlet_sb[t0].enth_mol) 

658 if inlet_has_source 

659 else None 

660 ) 

661 enthalpy_from_fixed = ( 

662 _value_or_none(inlet_sb[t0].enth_mol) 

663 if inlet_sb[t0].enth_mol.fixed 

664 else None 

665 ) 

666 

667 inlet_sb[t0].pressure.set_value(p_in) 

668 temperature_sat = _value_or_none( 

669 getattr(inlet_sb[t0], "temperature_sat", None) 

670 ) 

671 

672 h_in = _pick_seed( 

673 local_args.get("enth_mol"), 

674 enthalpy_from_upstream, 

675 enthalpy_from_fixed, 

676 inlet_enthalpy_from_outlets, 

677 _enthalpy_from_tp( 

678 temperature_sat + 10.0, 

679 p_in, 

680 ) if temperature_sat is not None else None, 

681 ) 

682 

683 inlet_seeds[inlet_name]["enth_mol"] = h_in 

684 seeded_inlet_enthalpies.append(h_in*flow_mol) 

685 

686 # Seed inlet enthalpy to mixed and outlet states  

687 h_mixed = (sum(seeded_inlet_enthalpies) - pyo.value(self.heat_loss[t0])) / (sum(seeded_inlet_flows) + 1e-6) 

688 for outlet_name, outlet_sb in zip(self.outlet_list, self.outlet_blocks): 

689 outlet_seeds[outlet_name]["enth_mol"] = h_mixed 

690 

691 # TODO: do better seeding of condensate enthalpy 

692 #----------------------------------------------------- 

693 # 2. Initialize state blocks using explicitly seeded values for all state variables 

694 for inlet_name, inlet_sb in zip(self.inlet_list, self.inlet_blocks): 

695 inlet_sb.initialize( 

696 solver=solver, 

697 optarg=optarg, 

698 outlvl=outlvl, 

699 state_args=inlet_seeds[inlet_name], 

700 ) 

701 init_log.info_high(f"{inlet_name} state initialization complete") 

702 

703 flags_mixed = self.mixed_state.initialize( 

704 solver=solver, 

705 optarg=optarg, 

706 outlvl=outlvl, 

707 state_args={ 

708 "flow_mol": _pick_seed(mixed_state_args.get("flow_mol"), f_mixed), 

709 "pressure": _pick_seed(mixed_state_args.get("pressure"), p_mixed), 

710 "enth_mol": _pick_seed(mixed_state_args.get("enth_mol"), h_mixed), 

711 }, 

712 hold_state=True, 

713 ) 

714 init_log.info_high("mixed state initialization complete") 

715 

716 held_outlet_states = {} 

717 for outlet_name, outlet_sb in zip(self.outlet_list, self.outlet_blocks): 

718 held_outlet_states[outlet_name] = outlet_sb.initialize( 

719 solver=solver, 

720 optarg=optarg, 

721 outlvl=outlvl, 

722 state_args=outlet_seeds[outlet_name], 

723 hold_state=True, 

724 ) 

725 init_log.info_high(f"{outlet_name} state initialization complete") 

726 # #----------------------------------------------------- 

727 # # 3. Deactivate constraints not specified in Step 2 then solve first pass 

728 relaxed_eqns = [ 

729 self.mixed_state_material_balance, 

730 self.condensate_flow_balance, 

731 self.vent_flow_balance, 

732 self.inlets_to_mixed_state_energy_balance, 

733 self.mixed_state_to_outlets_energy_balance, 

734 self.molar_enthalpy_equality_eqn, 

735 self.minimum_pressure_constraint, 

736 self.mixture_pressure, 

737 self.pressure_equality_eqn, 

738 ] 

739 if self.config.is_liquid_header: 

740 relaxed_eqns.append(self.vent_vapour_fraction) 

741 else: 

742 relaxed_eqns.append(self.condensate_vapour_fraction) 

743 

744 for con in relaxed_eqns: 

745 con.deactivate() 

746 

747 report_statistics(self) 

748 

749 dt = DiagnosticsToolbox(self) 

750 dt.report_structural_issues() 

751 dt.display_underconstrained_set() 

752 

753 active_constraints = sum( 

754 1 for _ in self.component_data_objects(Constraint, active=True, descend_into=True) 

755 ) 

756 

757 if active_constraints > 0: 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true

758 with idaeslog.solver_log(solve_log, idaeslog.DEBUG) as slc: 

759 res = opt.solve(self, tee=slc.tee) 

760 if not check_optimal_termination(res): 

761 dt.report_numerical_issues() 

762 raise InitializationError(f"{self.name} failed relaxed initialization") 

763 else: 

764 init_log.info_high( 

765 "Relaxed initialization pass skipped: no active constraints remained after seeding." 

766 ) 

767 

768 # Restore full model 

769 self.mixed_state.release_state(flags_mixed) 

770 for outlet_name, flags in held_outlet_states.items(): 

771 getattr(self, f"{outlet_name}_state").release_state(flags) 

772 

773 for con in relaxed_eqns: 

774 con.activate() 

775 

776 dof = degrees_of_freedom(self) 

777 # NOTE: Seems header mostly has >0 DoF in tests so check is skipped, but should be revisited to debug it 

778 # if dof != 0: 

779 # raise InitializationError( 

780 # f"{self.name} degrees of freedom were not 0 before final solve. DoF = {dof}" 

781 # ) 

782 

783 with idaeslog.solver_log(solve_log, idaeslog.DEBUG) as slc: 

784 res = opt.solve(self, tee=slc.tee) 

785 if not check_optimal_termination(res): 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true

786 raise InitializationError(f"{self.name} failed final initialization") 

787 

788 init_log.info(f"Initialization complete: {idaeslog.condition(res)}") 

789 

790 def _get_performance_contents(self, time_point=0, is_full_report=True): 

791 """Collect performance results for reporting. 

792 

793 Args: 

794 time_point (int | float): Time index at which to report values. 

795 is_full_report (bool): Flag for full or partial performance report. 

796 

797 Returns: 

798 dict: A report of internal unit model results. 

799 """ 

800 if is_full_report: 

801 var_dict = { 

802 "Heat Loss": self.heat_loss[time_point], 

803 "Pressure Drop": self.pressure_loss[time_point], 

804 "Mass Flow": self.mixed_state[time_point].flow_mass, 

805 "Molar Flow": self.mixed_state[time_point].flow_mol, 

806 "Balance Flow": self.balance_flow_mol[time_point], 

807 "Pressure": self.mixed_state[time_point].pressure, 

808 "Temperature": self.mixed_state[time_point].temperature, 

809 "Degree of Superheat": self.degree_of_superheat[time_point], 

810 "Vapour Fraction": self.mixed_state[time_point].vapor_frac, 

811 "Mass Specific Enthalpy": self.mixed_state[time_point].enth_mass, 

812 "Molar Specific Enthalpy": self.mixed_state[time_point].enth_mol, 

813 } 

814 else: 

815 var_dict = { 

816 "Balance Flow": self.balance_flow_mol[time_point], 

817 "Pressure": self.mixed_state[time_point].pressure, 

818 "Temperature": self.mixed_state[time_point].temperature, 

819 "Degree of Superheat": self.degree_of_superheat[time_point], 

820 } 

821 

822 return {"vars": var_dict} 

823 

824 def diagnose(self) -> list[tuple[pyo.Component, str]]: 

825 """Report common header formulation issues for flowsheet diagnostics.""" 

826 problems = [] 

827 

828 for time in self.flowsheet().time: 

829 balance_flow = value(self.balance_flow_mol[time], exception=False) 

830 condensate_flow = value( 

831 self.outlet_condensate_state[time].flow_mol, 

832 exception=False, 

833 ) 

834 total_flow = value(self.total_flow_mol[time], exception=False) 

835 

836 if balance_flow is not None and abs(balance_flow) > 1e-6: 

837 problems.append( 

838 ( 

839 self.balance_flow_mol[time], 

840 f"Balance flow is {balance_flow:.6g} mol/s. " 

841 "This means the header is adding flow from nowhere " 

842 "to make the material balance close. Replace balance flow " 

843 "with an inlet flow variable to specify the makeup.", 

844 ) 

845 ) 

846 

847 if ( 

848 not self.config.is_liquid_header 

849 and condensate_flow is not None 

850 and total_flow is not None 

851 ): 

852 condensate_tolerance = max(abs(total_flow) * 1e-3, 1e-6) 

853 if ( 

854 total_flow > 1e-6 

855 and abs(condensate_flow - total_flow) <= condensate_tolerance 

856 ): 

857 problems.append( 

858 ( 

859 self.total_flow_mol[time], 

860 f"Condensate flow ({condensate_flow:.6g} mol/s) " 

861 f"is approximately equal to total header flow " 

862 f"({total_flow:.6g} mol/s). This means everything " 

863 "entering the steam header is liquid after mixing, " 

864 "so the steam header will fail to solve. Check the " 

865 "inlet conditions and ensure there is sufficient " 

866 "vapour entering the header.", 

867 ) 

868 ) 

869 

870 return problems 

871 

872 

873 # ----------------------------------------------------------------- 

874 # Common utilities 

875 # ----------------------------------------------------------------- 

876 

877 def calculate_scaling_factors(self): 

878 super().calculate_scaling_factors() 

879 iscale.set_scaling_factor(self.heat_loss, 1e-6) 

880 iscale.set_scaling_factor(self.pressure_loss, 1e-6) 

881 iscale.set_scaling_factor(self.balance_flow_mol, 1e-3) 

882 iscale.set_scaling_factor(self._partial_total_flow_mol, 1e-3) 

883 if hasattr(self, "_vap_out_enth_mol"): 

884 iscale.set_scaling_factor(self._vap_out_enth_mol, 1e-6) 

885 

886 

887 def _build_state_blocks( 

888 self, 

889 stream_name_list: Iterable[str], 

890 has_phase_equilibrium: bool, 

891 is_defined_state: Optional[bool] = False, 

892 is_build_port: Optional[bool] = False, 

893 ) -> List[StateBlock]: 

894 blocks: List[StateBlock] = [] 

895 

896 base_args = dict(self.config.property_package_args) 

897 base_args["has_phase_equilibrium"] = has_phase_equilibrium 

898 base_args["defined_state"] = is_defined_state 

899 

900 for stream_name in stream_name_list: 

901 args = dict(base_args) 

902 args["doc"] = f"Thermophysical properties at {stream_name}" 

903 sb = self.config.property_package.build_state_block(self.flowsheet().time, **args) 

904 setattr(self, f"{stream_name}_state", sb) 

905 blocks.append(sb) 

906 

907 if is_build_port: # No port is needed for intermediate/internal state blocks 

908 self.add_port(name=stream_name, block=sb) 

909 

910 return blocks 

911 

912 

913 def _get_stream_table_contents(self, time_point=0): 

914 io_dict = {name: getattr(self, name) for name in [*self.inlet_blocks, *self.outlet_blocks, *self.internal_blocks]} 

915 return create_stream_table_dataframe(io_dict, time_point=time_point)