Coverage for backend/idaes_service/solver/custom/PIDController.py: 8%

148 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from idaes.models.control.controller import PIDControllerData, ControllerType, ControllerMVBoundType, ControllerAntiwindupType, smooth_bound, smooth_heaviside 

2from pyomo.environ import Var 

3from idaes.core import UnitModelBlockData, declare_process_block_class 

4from pyomo.common.config import ConfigValue, In, Bool 

5import pyomo.environ as pyo 

6from idaes.core.util.exceptions import ConfigurationError 

7import pyomo.dae as pyodae 

8from idaes.core.util import scaling as iscale 

9import functools 

10 

11 

12@declare_process_block_class( 

13 "PIDController2", 

14 doc="PID controller model block. To use this the model must be dynamic.", 

15) 

16class PIDController2Data(UnitModelBlockData): 

17 """ 

18 PID controller class. 

19 """ 

20 

21 CONFIG = UnitModelBlockData.CONFIG() 

22 CONFIG.declare( 

23 "mv_bound_type", 

24 ConfigValue( 

25 default=ControllerMVBoundType.NONE, 

26 domain=In( 

27 [ 

28 ControllerMVBoundType.NONE, 

29 ControllerMVBoundType.SMOOTH_BOUND, 

30 ControllerMVBoundType.LOGISTIC, 

31 ] 

32 ), 

33 description="Type of bounds to apply to the manipulated variable (mv)).", 

34 doc=( 

35 """Type of bounds to apply to the manipulated variable output. If, 

36 bounds are applied, the model parameters **mv_lb** and **mv_ub** set the bounds. 

37 The **default** is ControllerMVBoundType.NONE. See the controller documentation 

38 for details on the mathematical formulation. The options are: 

39 **ControllerMVBoundType.NONE** no bounds, **ControllerMVBoundType.SMOOTH_BOUND** 

40 smoothed mv = min(max(mv_unbound, ub), lb), and **ControllerMVBoundType.LOGISTIC** 

41 logistic function to enforce bounds. 

42 """ 

43 ), 

44 ), 

45 ) 

46 CONFIG.declare( 

47 "calculate_initial_integral", 

48 ConfigValue( 

49 default=True, 

50 domain=Bool, 

51 description="Calculate the initial integral term value if True", 

52 doc="Calculate the initial integral term value if True", 

53 ), 

54 ) 

55 CONFIG.declare( 

56 "controller_type", 

57 ConfigValue( 

58 default=ControllerType.PI, 

59 domain=In( 

60 [ 

61 ControllerType.P, 

62 ControllerType.PI, 

63 ControllerType.PD, 

64 ControllerType.PID, 

65 ] 

66 ), 

67 description="Control type", 

68 doc="""Controller type. The **default** = ControllerType.PI and the 

69 options are: **ControllerType.P** Proportional, **ControllerType.PI** 

70 proportional and integral, **ControllerType.PD** proportional and derivative, and 

71 **ControllerType.PID** proportional, integral, and derivative 

72 """, 

73 ), 

74 ) 

75 CONFIG.declare( 

76 "antiwindup_type", 

77 ConfigValue( 

78 default=ControllerAntiwindupType.NONE, 

79 domain=In( 

80 [ 

81 ControllerAntiwindupType.NONE, 

82 ControllerAntiwindupType.CONDITIONAL_INTEGRATION, 

83 ControllerAntiwindupType.BACK_CALCULATION, 

84 ] 

85 ), 

86 description="Type of antiwindup technique to use.", 

87 doc=( 

88 """Type of antiwindup technique to use. Options are **ControllerAntiwindupType.NONE**, 

89 **ControllerAntiwindupType.CONDITIONAL_INTEGRATION**, and **ControllerAntiwindupType.BACK_CALCULATION**. 

90 See the controller documentation for details on the mathematical formulation. 

91 """ 

92 ), 

93 ), 

94 ) 

95 CONFIG.declare( 

96 "derivative_on_error", 

97 ConfigValue( 

98 default=False, 

99 domain=Bool, 

100 description="Whether basing derivative action on process var or error", 

101 doc="""Naive implementations of derivative action can cause large spikes in 

102 control when the setpoint is changed. One solution is to use the (negative) 

103 derivative of the process variable to calculate derivative action instead 

104 of using the derivative of setpoint error. If **True**, use the derivative of 

105 setpoint error to calculate derivative action. If **False** (default), use the 

106 (negative) derivative of the process variable instead. 

107 """, 

108 ), 

109 ) 

110 

111 def build(self): 

112 """ 

113 Build the PID block 

114 """ 

115 super().build() 

116 

117 

118 if self.config.dynamic is False: 

119 raise ConfigurationError( 

120 "PIDControllers work only with dynamic flowsheets." 

121 ) 

122 

123 """ 

124 We want to create the controlled and manipulated variables internally, and then the user can use constraints to set them later on. 

125 """ 

126 

127 self.process_var = Var( 

128 self.flowsheet().time, 

129 initialize=0.0, 

130 doc="Setpoint for the PID controller", 

131 ) 

132 self.manipulated_var = Var( 

133 self.flowsheet().time, 

134 initialize=0.0, 

135 doc="Manipulated variable for the PID controller", 

136 ) 

137 

138 # Shorter pointers to time set information 

139 time_set = self.flowsheet().time 

140 time_units = self.flowsheet().time_units 

141 if time_units is None: 

142 time_units = pyo.units.dimensionless 

143 t0 = time_set.first() 

144 

145 # Type Check 

146 if not issubclass(self.process_var[t0].ctype, (pyo.Var, pyo.Expression)): 

147 raise TypeError( 

148 f"process_var must reference a Var or Expression not {self.process_var[t0].ctype}" 

149 ) 

150 if not issubclass(self.manipulated_var[t0].ctype, pyo.Var): 

151 raise TypeError( 

152 f"manipulated_var must reference a Var not {self.manipulated_var[t0].ctype}" 

153 ) 

154 

155 if not self.config.antiwindup_type == ControllerAntiwindupType.NONE: 

156 if not self.config.controller_type in [ 

157 ControllerType.PI, 

158 ControllerType.PID, 

159 ]: 

160 raise ConfigurationError( 

161 "User specified antiwindup method for controller without integral action." 

162 ) 

163 if self.config.mv_bound_type == ControllerMVBoundType.NONE: 

164 raise ConfigurationError( 

165 "User specified antiwindup method for unbounded MV." 

166 ) 

167 

168 # Get the appropriate units for various controller variables 

169 mv_units = pyo.units.get_units(self.manipulated_var[t0]) 

170 pv_units = pyo.units.get_units(self.process_var[t0]) 

171 if mv_units is None: 

172 mv_units = pyo.units.dimensionless 

173 if pv_units is None: 

174 pv_units = pyo.units.dimensionless 

175 gain_p_units = mv_units / pv_units 

176 

177 if not self.config.mv_bound_type == ControllerMVBoundType.NONE: 

178 # Parameters 

179 self.mv_lb = pyo.Param( 

180 mutable=True, 

181 initialize=0.05, 

182 doc="Controller output lower bound", 

183 units=mv_units, 

184 ) 

185 self.mv_ub = pyo.Param( 

186 mutable=True, 

187 initialize=1, 

188 doc="Controller output upper bound", 

189 units=mv_units, 

190 ) 

191 if self.config.mv_bound_type == ControllerMVBoundType.SMOOTH_BOUND: 

192 self.smooth_eps = pyo.Param( 

193 mutable=True, 

194 initialize=1e-4, 

195 doc="Smoothing parameter for controller output limits when the bound" 

196 " type is SMOOTH_BOUND", 

197 units=mv_units, 

198 ) 

199 elif self.config.mv_bound_type == ControllerMVBoundType.LOGISTIC: 

200 self.logistic_bound_k = pyo.Param( 

201 mutable=True, 

202 initialize=4, 

203 doc="Smoothing parameter for controller output limits when the bound" 

204 " type is LOGISTIC", 

205 units=pyo.units.dimensionless, 

206 ) 

207 if ( 

208 self.config.antiwindup_type 

209 == ControllerAntiwindupType.CONDITIONAL_INTEGRATION 

210 ): 

211 self.conditional_integration_k = pyo.Param( 

212 mutable=True, 

213 initialize=200, 

214 doc="Parameter governing steepness of transition between integrating and not integrating." 

215 "A larger value means a steeper transition.", 

216 units=pyo.units.dimensionless, 

217 ) 

218 

219 # Variable for basic controller settings may change with time. 

220 self.setpoint = pyo.Var( 

221 time_set, initialize=0.5, doc="Setpoint", units=pv_units 

222 ) 

223 self.gain_p = pyo.Var( 

224 time_set, 

225 initialize=0.1, 

226 doc="Gain for proportional part", 

227 units=gain_p_units, 

228 ) 

229 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]: 

230 self.gain_i = pyo.Var( 

231 time_set, 

232 initialize=0.1, 

233 doc="Gain for integral part", 

234 units=gain_p_units / time_units, 

235 ) 

236 if self.config.controller_type in [ControllerType.PD, ControllerType.PID]: 

237 self.gain_d = pyo.Var( 

238 time_set, 

239 initialize=0.01, 

240 doc="Gain for derivative part", 

241 units=gain_p_units * time_units, 

242 ) 

243 

244 if self.config.antiwindup_type == ControllerAntiwindupType.BACK_CALCULATION: 

245 self.gain_b = pyo.Var( 

246 time_set, 

247 initialize=0.1, 

248 doc="Gain for back calculation antiwindup", 

249 units=1 / time_units, 

250 ) 

251 

252 self.mv_ref = pyo.Var( 

253 time_set, 

254 initialize=0.5, 

255 doc="Controller bias", 

256 units=mv_units, 

257 ) 

258 

259 # Error expression or variable (variable required for derivative term) 

260 if ( 

261 self.config.controller_type in [ControllerType.PD, ControllerType.PID] 

262 and self.config.derivative_on_error 

263 ): 

264 self.error = pyo.Var( 

265 time_set, initialize=0, doc="Error variable", units=pv_units 

266 ) 

267 

268 @self.Constraint(time_set, doc="Error constraint") 

269 def error_eqn(b, t): 

270 return b.error[t] == b.setpoint[t] - b.process_var[t] 

271 

272 self.derivative_term = pyodae.DerivativeVar( 

273 self.error, 

274 wrt=self.flowsheet().time, 

275 initialize=0, 

276 units=pv_units / time_units, 

277 ) 

278 

279 else: 

280 

281 @self.Expression(time_set, doc="Error expression") 

282 def error(b, t): 

283 return b.setpoint[t] - b.process_var[t] 

284 

285 if ( 

286 self.config.controller_type in [ControllerType.PD, ControllerType.PID] 

287 and not self.config.derivative_on_error 

288 ): 

289 # Need to create a Var because process_var might be an Expression 

290 self.negative_pv = pyo.Var( 

291 time_set, 

292 initialize=0, 

293 doc="Negative of process variable", 

294 units=pv_units, 

295 ) 

296 

297 @self.Constraint(time_set, doc="Negative process variable equation") 

298 def negative_pv_eqn(b, t): 

299 return b.negative_pv[t] == -b.process_var[t] 

300 

301 self.derivative_term = pyodae.DerivativeVar( 

302 self.negative_pv, 

303 wrt=self.flowsheet().time, 

304 initialize=0, 

305 units=pv_units / time_units, 

306 ) 

307 

308 # integral term written de_i(t)/dt = e(t) 

309 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]: 

310 self.mv_integral_component = pyo.Var( 

311 time_set, 

312 initialize=0, 

313 doc="Integral contribution to control action", 

314 units=mv_units, 

315 ) 

316 self.mv_integral_component_dot = pyodae.DerivativeVar( 

317 self.mv_integral_component, 

318 wrt=time_set, 

319 initialize=0, 

320 units=mv_units / time_units, 

321 doc="Rate of change of integral contribution to control action", 

322 ) 

323 

324 if self.config.calculate_initial_integral: 

325 

326 @self.Constraint(doc="Calculate initial e_i based on output") 

327 def initial_integral_error_eqn(b): 

328 if self.config.controller_type == ControllerType.PI: 

329 return b.mv_integral_component[t0] == ( 

330 b.manipulated_var[t0] 

331 - b.mv_ref[t0] 

332 - b.gain_p[t0] * b.error[t0] 

333 ) 

334 return b.mv_integral_component[t0] == ( 

335 b.manipulated_var[t0] 

336 - b.mv_ref[t0] 

337 - b.gain_p[t0] * b.error[t0] 

338 - b.gain_d[t0] * b.derivative_term[t0] 

339 ) 

340 

341 @self.Expression(time_set, doc="Unbounded output for manipulated variable") 

342 def mv_unbounded(b, t): 

343 if self.config.controller_type == ControllerType.PID: 

344 return ( 

345 b.mv_ref[t] 

346 + b.gain_p[t] * b.error[t] 

347 + b.mv_integral_component[t] 

348 + b.gain_d[t] * b.derivative_term[t] 

349 ) 

350 elif self.config.controller_type == ControllerType.PI: 

351 return ( 

352 b.mv_ref[t] + b.gain_p[t] * b.error[t] + b.mv_integral_component[t] 

353 ) 

354 elif self.config.controller_type == ControllerType.PD: 

355 return ( 

356 b.mv_ref[t] 

357 + b.gain_p[t] * b.error[t] 

358 + b.gain_d[t] * b.derivative_term[t] 

359 ) 

360 elif self.config.controller_type == ControllerType.P: 

361 return b.mv_ref[t] + b.gain_p[t] * b.error[t] 

362 else: 

363 raise ConfigurationError( 

364 f"{self.config.controller_type} is not a valid PID controller type" 

365 ) 

366 

367 @self.Constraint(time_set, doc="Bounded output of manipulated variable") 

368 def mv_eqn(b, t): 

369 if self.config.mv_bound_type == ControllerMVBoundType.SMOOTH_BOUND: 

370 return b.manipulated_var[t] == smooth_bound( 

371 b.mv_unbounded[t], lb=b.mv_lb, ub=b.mv_ub, eps=b.smooth_eps 

372 ) 

373 elif self.config.mv_bound_type == ControllerMVBoundType.LOGISTIC: 

374 return ( 

375 (b.manipulated_var[t] - b.mv_lb) 

376 * ( 

377 1 

378 + pyo.exp( 

379 -b.logistic_bound_k 

380 * (b.mv_unbounded[t] - (b.mv_lb + b.mv_ub) / 2) 

381 / (b.mv_ub - b.mv_lb) 

382 ) 

383 ) 

384 ) == b.mv_ub - b.mv_lb 

385 return b.manipulated_var[t] == b.mv_unbounded[t] 

386 

387 # deactivate the time 0 mv_eqn instead of skip, should be fine since 

388 # first time step always exists. 

389 if self.config.calculate_initial_integral: 

390 self.mv_eqn[t0].deactivate() 

391 

392 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]: 

393 

394 @self.Constraint(time_set, doc="de_i(t)/dt = e(t)") 

395 def mv_integration_eqn(b, t): 

396 if ( 

397 self.config.antiwindup_type 

398 == ControllerAntiwindupType.CONDITIONAL_INTEGRATION 

399 ): 

400 # This expression is not sensitive to whether the "right" or "wrong" bound is active for a given 

401 # expression of error. 

402 return b.mv_integral_component_dot[t] == b.gain_i[t] * b.error[ 

403 t 

404 ] * ( 

405 smooth_heaviside( 

406 (b.mv_unbounded[t] - b.mv_lb) / (b.mv_ub - b.mv_lb), 

407 b.conditional_integration_k, 

408 ) 

409 # 1 

410 - smooth_heaviside( 

411 (b.mv_unbounded[t] - b.mv_ub) / (b.mv_ub - b.mv_lb), 

412 b.conditional_integration_k, 

413 ) 

414 ) 

415 elif ( 

416 self.config.antiwindup_type 

417 == ControllerAntiwindupType.BACK_CALCULATION 

418 ): 

419 return b.mv_integral_component_dot[t] == b.gain_i[t] * b.error[ 

420 t 

421 ] + b.gain_b[t] * (b.manipulated_var[t] - b.mv_unbounded[t]) 

422 else: 

423 return b.mv_integral_component_dot[t] == b.gain_i[t] * b.error[t] 

424 

425 self.mv_integration_eqn[t0].deactivate() 

426 

427 

428 # TODO: Initial values. 

429 # For now, mv_integral_component is 0 

430 self.mv_integral_component[t0].fix(0) 

431 

432 def calculate_scaling_factors(self): 

433 super().calculate_scaling_factors() 

434 gsf = iscale.get_scaling_factor 

435 ssf = functools.partial(iscale.set_scaling_factor, overwrite=False) 

436 cst = functools.partial(iscale.constraint_scaling_transform, overwrite=False) 

437 

438 # orig_pv = self.config.process_var 

439 # orig_mv = self.config.manipulated_var 

440 time_set = self.flowsheet().time 

441 t0 = time_set.first() 

442 

443 sf_pv = iscale.get_scaling_factor(self.config.process_var[t0]) 

444 if sf_pv is None: 

445 sf_pv = iscale.get_scaling_factor(self.process_var[t0], default=1) 

446 sf_mv = iscale.get_scaling_factor(self.config.manipulated_var[t0]) 

447 if sf_mv is None: 

448 sf_mv = iscale.get_scaling_factor(self.manipulated_var[t0], default=1) 

449 

450 # Don't like calling scaling laterally like this, but we need scaling factors for the pv and mv 

451 # Except this causes a StackOverflow with flowsheet-level PVs or MVs---put this on ice for now 

452 # if sf_pv is None: 

453 # try: 

454 # iscale.calculate_scaling_factors(self.config.process_var.parent_block()) 

455 # except RecursionError: 

456 # raise ConfigurationError( 

457 # f"Circular scaling dependency detected in Controller {self.name}. The only way this should be " 

458 # "able to happen is if a loop of controllers exists manipulating each others setpoints without " 

459 # "terminating in an actual process variable." 

460 # ) 

461 # if sf_mv is None: 

462 # try: 

463 # iscale.calculate_scaling_factors(self.config.manipulated_var.parent_block()) 

464 # except RecursionError: 

465 # raise ConfigurationError( 

466 # f"Circular scaling dependency detected in Controller {self.name}. The only way this should be " 

467 # "able to happen is if a loop of controllers exists manipulating each others setpoints without " 

468 # "terminating in an actual process variable." 

469 # ) 

470 

471 if self.config.calculate_initial_integral: 

472 sf_mv = gsf(self.manipulated_var[t0], default=1, warning=True) 

473 cst(self.initial_integral_error_eqn, sf_mv) 

474 

475 for t in time_set: 

476 sf_pv = gsf(self.process_var[t], default=1, warning=True) 

477 sf_mv = gsf(self.manipulated_var[t], default=1, warning=True) 

478 

479 ssf(self.setpoint[t], sf_pv) 

480 ssf(self.mv_ref[t], sf_mv) 

481 cst(self.mv_eqn[t], sf_mv) 

482 

483 if self.config.controller_type in [ControllerType.PD, ControllerType.PID]: 

484 if self.config.derivative_on_error: 

485 ssf(self.error[t], sf_pv) 

486 cst(self.error_eqn[t], sf_pv) 

487 else: 

488 ssf(self.negative_pv[t], sf_pv) 

489 cst(self.negative_pv_eqn[t], sf_pv) 

490 

491 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]: 

492 ssf(self.mv_integral_component[t], sf_mv) 

493 

494 cst(self.mv_integration_eqn[t], sf_pv)