Coverage for backend/idaes_service/solver/custom/PIDController.py: 8%
148 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from idaes.models.control.controller import PIDControllerData, ControllerType, ControllerMVBoundType, ControllerAntiwindupType, smooth_bound, smooth_heaviside
2from pyomo.environ import Var
3from idaes.core import UnitModelBlockData, declare_process_block_class
4from pyomo.common.config import ConfigValue, In, Bool
5import pyomo.environ as pyo
6from idaes.core.util.exceptions import ConfigurationError
7import pyomo.dae as pyodae
8from idaes.core.util import scaling as iscale
9import functools
12@declare_process_block_class(
13 "PIDController2",
14 doc="PID controller model block. To use this the model must be dynamic.",
15)
16class PIDController2Data(UnitModelBlockData):
17 """
18 PID controller class.
19 """
21 CONFIG = UnitModelBlockData.CONFIG()
22 CONFIG.declare(
23 "mv_bound_type",
24 ConfigValue(
25 default=ControllerMVBoundType.NONE,
26 domain=In(
27 [
28 ControllerMVBoundType.NONE,
29 ControllerMVBoundType.SMOOTH_BOUND,
30 ControllerMVBoundType.LOGISTIC,
31 ]
32 ),
33 description="Type of bounds to apply to the manipulated variable (mv)).",
34 doc=(
35 """Type of bounds to apply to the manipulated variable output. If,
36 bounds are applied, the model parameters **mv_lb** and **mv_ub** set the bounds.
37 The **default** is ControllerMVBoundType.NONE. See the controller documentation
38 for details on the mathematical formulation. The options are:
39 **ControllerMVBoundType.NONE** no bounds, **ControllerMVBoundType.SMOOTH_BOUND**
40 smoothed mv = min(max(mv_unbound, ub), lb), and **ControllerMVBoundType.LOGISTIC**
41 logistic function to enforce bounds.
42 """
43 ),
44 ),
45 )
46 CONFIG.declare(
47 "calculate_initial_integral",
48 ConfigValue(
49 default=True,
50 domain=Bool,
51 description="Calculate the initial integral term value if True",
52 doc="Calculate the initial integral term value if True",
53 ),
54 )
55 CONFIG.declare(
56 "controller_type",
57 ConfigValue(
58 default=ControllerType.PI,
59 domain=In(
60 [
61 ControllerType.P,
62 ControllerType.PI,
63 ControllerType.PD,
64 ControllerType.PID,
65 ]
66 ),
67 description="Control type",
68 doc="""Controller type. The **default** = ControllerType.PI and the
69 options are: **ControllerType.P** Proportional, **ControllerType.PI**
70 proportional and integral, **ControllerType.PD** proportional and derivative, and
71 **ControllerType.PID** proportional, integral, and derivative
72 """,
73 ),
74 )
75 CONFIG.declare(
76 "antiwindup_type",
77 ConfigValue(
78 default=ControllerAntiwindupType.NONE,
79 domain=In(
80 [
81 ControllerAntiwindupType.NONE,
82 ControllerAntiwindupType.CONDITIONAL_INTEGRATION,
83 ControllerAntiwindupType.BACK_CALCULATION,
84 ]
85 ),
86 description="Type of antiwindup technique to use.",
87 doc=(
88 """Type of antiwindup technique to use. Options are **ControllerAntiwindupType.NONE**,
89 **ControllerAntiwindupType.CONDITIONAL_INTEGRATION**, and **ControllerAntiwindupType.BACK_CALCULATION**.
90 See the controller documentation for details on the mathematical formulation.
91 """
92 ),
93 ),
94 )
95 CONFIG.declare(
96 "derivative_on_error",
97 ConfigValue(
98 default=False,
99 domain=Bool,
100 description="Whether basing derivative action on process var or error",
101 doc="""Naive implementations of derivative action can cause large spikes in
102 control when the setpoint is changed. One solution is to use the (negative)
103 derivative of the process variable to calculate derivative action instead
104 of using the derivative of setpoint error. If **True**, use the derivative of
105 setpoint error to calculate derivative action. If **False** (default), use the
106 (negative) derivative of the process variable instead.
107 """,
108 ),
109 )
111 def build(self):
112 """
113 Build the PID block
114 """
115 super().build()
118 if self.config.dynamic is False:
119 raise ConfigurationError(
120 "PIDControllers work only with dynamic flowsheets."
121 )
123 """
124 We want to create the controlled and manipulated variables internally, and then the user can use constraints to set them later on.
125 """
127 self.process_var = Var(
128 self.flowsheet().time,
129 initialize=0.0,
130 doc="Setpoint for the PID controller",
131 )
132 self.manipulated_var = Var(
133 self.flowsheet().time,
134 initialize=0.0,
135 doc="Manipulated variable for the PID controller",
136 )
138 # Shorter pointers to time set information
139 time_set = self.flowsheet().time
140 time_units = self.flowsheet().time_units
141 if time_units is None:
142 time_units = pyo.units.dimensionless
143 t0 = time_set.first()
145 # Type Check
146 if not issubclass(self.process_var[t0].ctype, (pyo.Var, pyo.Expression)):
147 raise TypeError(
148 f"process_var must reference a Var or Expression not {self.process_var[t0].ctype}"
149 )
150 if not issubclass(self.manipulated_var[t0].ctype, pyo.Var):
151 raise TypeError(
152 f"manipulated_var must reference a Var not {self.manipulated_var[t0].ctype}"
153 )
155 if not self.config.antiwindup_type == ControllerAntiwindupType.NONE:
156 if not self.config.controller_type in [
157 ControllerType.PI,
158 ControllerType.PID,
159 ]:
160 raise ConfigurationError(
161 "User specified antiwindup method for controller without integral action."
162 )
163 if self.config.mv_bound_type == ControllerMVBoundType.NONE:
164 raise ConfigurationError(
165 "User specified antiwindup method for unbounded MV."
166 )
168 # Get the appropriate units for various controller variables
169 mv_units = pyo.units.get_units(self.manipulated_var[t0])
170 pv_units = pyo.units.get_units(self.process_var[t0])
171 if mv_units is None:
172 mv_units = pyo.units.dimensionless
173 if pv_units is None:
174 pv_units = pyo.units.dimensionless
175 gain_p_units = mv_units / pv_units
177 if not self.config.mv_bound_type == ControllerMVBoundType.NONE:
178 # Parameters
179 self.mv_lb = pyo.Param(
180 mutable=True,
181 initialize=0.05,
182 doc="Controller output lower bound",
183 units=mv_units,
184 )
185 self.mv_ub = pyo.Param(
186 mutable=True,
187 initialize=1,
188 doc="Controller output upper bound",
189 units=mv_units,
190 )
191 if self.config.mv_bound_type == ControllerMVBoundType.SMOOTH_BOUND:
192 self.smooth_eps = pyo.Param(
193 mutable=True,
194 initialize=1e-4,
195 doc="Smoothing parameter for controller output limits when the bound"
196 " type is SMOOTH_BOUND",
197 units=mv_units,
198 )
199 elif self.config.mv_bound_type == ControllerMVBoundType.LOGISTIC:
200 self.logistic_bound_k = pyo.Param(
201 mutable=True,
202 initialize=4,
203 doc="Smoothing parameter for controller output limits when the bound"
204 " type is LOGISTIC",
205 units=pyo.units.dimensionless,
206 )
207 if (
208 self.config.antiwindup_type
209 == ControllerAntiwindupType.CONDITIONAL_INTEGRATION
210 ):
211 self.conditional_integration_k = pyo.Param(
212 mutable=True,
213 initialize=200,
214 doc="Parameter governing steepness of transition between integrating and not integrating."
215 "A larger value means a steeper transition.",
216 units=pyo.units.dimensionless,
217 )
219 # Variable for basic controller settings may change with time.
220 self.setpoint = pyo.Var(
221 time_set, initialize=0.5, doc="Setpoint", units=pv_units
222 )
223 self.gain_p = pyo.Var(
224 time_set,
225 initialize=0.1,
226 doc="Gain for proportional part",
227 units=gain_p_units,
228 )
229 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]:
230 self.gain_i = pyo.Var(
231 time_set,
232 initialize=0.1,
233 doc="Gain for integral part",
234 units=gain_p_units / time_units,
235 )
236 if self.config.controller_type in [ControllerType.PD, ControllerType.PID]:
237 self.gain_d = pyo.Var(
238 time_set,
239 initialize=0.01,
240 doc="Gain for derivative part",
241 units=gain_p_units * time_units,
242 )
244 if self.config.antiwindup_type == ControllerAntiwindupType.BACK_CALCULATION:
245 self.gain_b = pyo.Var(
246 time_set,
247 initialize=0.1,
248 doc="Gain for back calculation antiwindup",
249 units=1 / time_units,
250 )
252 self.mv_ref = pyo.Var(
253 time_set,
254 initialize=0.5,
255 doc="Controller bias",
256 units=mv_units,
257 )
259 # Error expression or variable (variable required for derivative term)
260 if (
261 self.config.controller_type in [ControllerType.PD, ControllerType.PID]
262 and self.config.derivative_on_error
263 ):
264 self.error = pyo.Var(
265 time_set, initialize=0, doc="Error variable", units=pv_units
266 )
268 @self.Constraint(time_set, doc="Error constraint")
269 def error_eqn(b, t):
270 return b.error[t] == b.setpoint[t] - b.process_var[t]
272 self.derivative_term = pyodae.DerivativeVar(
273 self.error,
274 wrt=self.flowsheet().time,
275 initialize=0,
276 units=pv_units / time_units,
277 )
279 else:
281 @self.Expression(time_set, doc="Error expression")
282 def error(b, t):
283 return b.setpoint[t] - b.process_var[t]
285 if (
286 self.config.controller_type in [ControllerType.PD, ControllerType.PID]
287 and not self.config.derivative_on_error
288 ):
289 # Need to create a Var because process_var might be an Expression
290 self.negative_pv = pyo.Var(
291 time_set,
292 initialize=0,
293 doc="Negative of process variable",
294 units=pv_units,
295 )
297 @self.Constraint(time_set, doc="Negative process variable equation")
298 def negative_pv_eqn(b, t):
299 return b.negative_pv[t] == -b.process_var[t]
301 self.derivative_term = pyodae.DerivativeVar(
302 self.negative_pv,
303 wrt=self.flowsheet().time,
304 initialize=0,
305 units=pv_units / time_units,
306 )
308 # integral term written de_i(t)/dt = e(t)
309 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]:
310 self.mv_integral_component = pyo.Var(
311 time_set,
312 initialize=0,
313 doc="Integral contribution to control action",
314 units=mv_units,
315 )
316 self.mv_integral_component_dot = pyodae.DerivativeVar(
317 self.mv_integral_component,
318 wrt=time_set,
319 initialize=0,
320 units=mv_units / time_units,
321 doc="Rate of change of integral contribution to control action",
322 )
324 if self.config.calculate_initial_integral:
326 @self.Constraint(doc="Calculate initial e_i based on output")
327 def initial_integral_error_eqn(b):
328 if self.config.controller_type == ControllerType.PI:
329 return b.mv_integral_component[t0] == (
330 b.manipulated_var[t0]
331 - b.mv_ref[t0]
332 - b.gain_p[t0] * b.error[t0]
333 )
334 return b.mv_integral_component[t0] == (
335 b.manipulated_var[t0]
336 - b.mv_ref[t0]
337 - b.gain_p[t0] * b.error[t0]
338 - b.gain_d[t0] * b.derivative_term[t0]
339 )
341 @self.Expression(time_set, doc="Unbounded output for manipulated variable")
342 def mv_unbounded(b, t):
343 if self.config.controller_type == ControllerType.PID:
344 return (
345 b.mv_ref[t]
346 + b.gain_p[t] * b.error[t]
347 + b.mv_integral_component[t]
348 + b.gain_d[t] * b.derivative_term[t]
349 )
350 elif self.config.controller_type == ControllerType.PI:
351 return (
352 b.mv_ref[t] + b.gain_p[t] * b.error[t] + b.mv_integral_component[t]
353 )
354 elif self.config.controller_type == ControllerType.PD:
355 return (
356 b.mv_ref[t]
357 + b.gain_p[t] * b.error[t]
358 + b.gain_d[t] * b.derivative_term[t]
359 )
360 elif self.config.controller_type == ControllerType.P:
361 return b.mv_ref[t] + b.gain_p[t] * b.error[t]
362 else:
363 raise ConfigurationError(
364 f"{self.config.controller_type} is not a valid PID controller type"
365 )
367 @self.Constraint(time_set, doc="Bounded output of manipulated variable")
368 def mv_eqn(b, t):
369 if self.config.mv_bound_type == ControllerMVBoundType.SMOOTH_BOUND:
370 return b.manipulated_var[t] == smooth_bound(
371 b.mv_unbounded[t], lb=b.mv_lb, ub=b.mv_ub, eps=b.smooth_eps
372 )
373 elif self.config.mv_bound_type == ControllerMVBoundType.LOGISTIC:
374 return (
375 (b.manipulated_var[t] - b.mv_lb)
376 * (
377 1
378 + pyo.exp(
379 -b.logistic_bound_k
380 * (b.mv_unbounded[t] - (b.mv_lb + b.mv_ub) / 2)
381 / (b.mv_ub - b.mv_lb)
382 )
383 )
384 ) == b.mv_ub - b.mv_lb
385 return b.manipulated_var[t] == b.mv_unbounded[t]
387 # deactivate the time 0 mv_eqn instead of skip, should be fine since
388 # first time step always exists.
389 if self.config.calculate_initial_integral:
390 self.mv_eqn[t0].deactivate()
392 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]:
394 @self.Constraint(time_set, doc="de_i(t)/dt = e(t)")
395 def mv_integration_eqn(b, t):
396 if (
397 self.config.antiwindup_type
398 == ControllerAntiwindupType.CONDITIONAL_INTEGRATION
399 ):
400 # This expression is not sensitive to whether the "right" or "wrong" bound is active for a given
401 # expression of error.
402 return b.mv_integral_component_dot[t] == b.gain_i[t] * b.error[
403 t
404 ] * (
405 smooth_heaviside(
406 (b.mv_unbounded[t] - b.mv_lb) / (b.mv_ub - b.mv_lb),
407 b.conditional_integration_k,
408 )
409 # 1
410 - smooth_heaviside(
411 (b.mv_unbounded[t] - b.mv_ub) / (b.mv_ub - b.mv_lb),
412 b.conditional_integration_k,
413 )
414 )
415 elif (
416 self.config.antiwindup_type
417 == ControllerAntiwindupType.BACK_CALCULATION
418 ):
419 return b.mv_integral_component_dot[t] == b.gain_i[t] * b.error[
420 t
421 ] + b.gain_b[t] * (b.manipulated_var[t] - b.mv_unbounded[t])
422 else:
423 return b.mv_integral_component_dot[t] == b.gain_i[t] * b.error[t]
425 self.mv_integration_eqn[t0].deactivate()
428 # TODO: Initial values.
429 # For now, mv_integral_component is 0
430 self.mv_integral_component[t0].fix(0)
432 def calculate_scaling_factors(self):
433 super().calculate_scaling_factors()
434 gsf = iscale.get_scaling_factor
435 ssf = functools.partial(iscale.set_scaling_factor, overwrite=False)
436 cst = functools.partial(iscale.constraint_scaling_transform, overwrite=False)
438 # orig_pv = self.config.process_var
439 # orig_mv = self.config.manipulated_var
440 time_set = self.flowsheet().time
441 t0 = time_set.first()
443 sf_pv = iscale.get_scaling_factor(self.config.process_var[t0])
444 if sf_pv is None:
445 sf_pv = iscale.get_scaling_factor(self.process_var[t0], default=1)
446 sf_mv = iscale.get_scaling_factor(self.config.manipulated_var[t0])
447 if sf_mv is None:
448 sf_mv = iscale.get_scaling_factor(self.manipulated_var[t0], default=1)
450 # Don't like calling scaling laterally like this, but we need scaling factors for the pv and mv
451 # Except this causes a StackOverflow with flowsheet-level PVs or MVs---put this on ice for now
452 # if sf_pv is None:
453 # try:
454 # iscale.calculate_scaling_factors(self.config.process_var.parent_block())
455 # except RecursionError:
456 # raise ConfigurationError(
457 # f"Circular scaling dependency detected in Controller {self.name}. The only way this should be "
458 # "able to happen is if a loop of controllers exists manipulating each others setpoints without "
459 # "terminating in an actual process variable."
460 # )
461 # if sf_mv is None:
462 # try:
463 # iscale.calculate_scaling_factors(self.config.manipulated_var.parent_block())
464 # except RecursionError:
465 # raise ConfigurationError(
466 # f"Circular scaling dependency detected in Controller {self.name}. The only way this should be "
467 # "able to happen is if a loop of controllers exists manipulating each others setpoints without "
468 # "terminating in an actual process variable."
469 # )
471 if self.config.calculate_initial_integral:
472 sf_mv = gsf(self.manipulated_var[t0], default=1, warning=True)
473 cst(self.initial_integral_error_eqn, sf_mv)
475 for t in time_set:
476 sf_pv = gsf(self.process_var[t], default=1, warning=True)
477 sf_mv = gsf(self.manipulated_var[t], default=1, warning=True)
479 ssf(self.setpoint[t], sf_pv)
480 ssf(self.mv_ref[t], sf_mv)
481 cst(self.mv_eqn[t], sf_mv)
483 if self.config.controller_type in [ControllerType.PD, ControllerType.PID]:
484 if self.config.derivative_on_error:
485 ssf(self.error[t], sf_pv)
486 cst(self.error_eqn[t], sf_pv)
487 else:
488 ssf(self.negative_pv[t], sf_pv)
489 cst(self.negative_pv_eqn[t], sf_pv)
491 if self.config.controller_type in [ControllerType.PI, ControllerType.PID]:
492 ssf(self.mv_integral_component[t], sf_mv)
494 cst(self.mv_integration_eqn[t], sf_pv)