Coverage for backend/django/Economics/costing/cost_curves/driver_specs.py: 85%

220 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Typed driver-input and variant contracts for economics cost curves.""" 

2 

3from __future__ import annotations 

4 

5from decimal import Decimal 

6import re 

7from typing import Any, Iterable, Literal, TypeAlias 

8 

9from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator, model_validator 

10 

11from Economics.costing.cost_curves.evaluation import normalize_economics_unit_notation 

12from Economics.costing.cost_curves.unit_options import cost_curve_input_unit_options 

13 

14UnitOptionPayload: TypeAlias = dict[str, str] 

15CostCurveDriverSpecPayload: TypeAlias = dict[str, str | bool | list[str] | None] 

16CostCurveDriverSpecReadPayload: TypeAlias = dict[ 

17 str, str | bool | list[str] | list[UnitOptionPayload] | None 

18] 

19CapitalCostDriverInputPayload: TypeAlias = dict[str, str | int | None] 

20CapitalCostDriverInputsPayload: TypeAlias = dict[str, CapitalCostDriverInputPayload] 

21CostCurveDiscreteVariantPayload: TypeAlias = dict[str, str | dict[str, str]] 

22 

23 

24CostCurveDriverRole = Literal["formula_input", "discrete_selector"] 

25CostCurveDriverSource = Literal["property", "manual"] 

26 

27 

28class CostCurveDriverSpec(BaseModel): 

29 """One declared input needed to size a cost curve. 

30 

31 Cost curves still store this in JSON so templates and user-authored curves 

32 can remain declarative, but all reads/writes are normalized through this 

33 model before the JSON reaches the API or generated capital-line payloads. 

34 """ 

35 

36 model_config = ConfigDict(frozen=True, extra="forbid") 

37 

38 key: str 

39 label: str 

40 unit: str 

41 role: CostCurveDriverRole 

42 variable_symbol: str = "" 

43 required: bool = True 

44 primary: bool = False 

45 valid_min: str = "" 

46 valid_max: str = "" 

47 valid_range_note: str = "" 

48 soft_maximum_adjustment_percent: str = "" 

49 default_manual_value: str = "" 

50 source_options: tuple[CostCurveDriverSource, ...] = Field( 

51 default=("property", "manual"), 

52 min_length=1, 

53 ) 

54 

55 @field_validator("key", "label", "unit") 

56 @classmethod 

57 def _required_text(cls, value: str) -> str: 

58 value = str(value or "").strip() 

59 if not value: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 raise ValueError("Required driver spec fields cannot be blank.") 

61 return value 

62 

63 @field_validator( 

64 "variable_symbol", 

65 "valid_min", 

66 "valid_max", 

67 "valid_range_note", 

68 "soft_maximum_adjustment_percent", 

69 "default_manual_value", 

70 mode="before", 

71 ) 

72 @classmethod 

73 def _optional_text(cls, value: Any) -> str: 

74 return "" if value is None else str(value).strip() 

75 

76 @field_validator("unit") 

77 @classmethod 

78 def _normalize_unit(cls, value: str) -> str: 

79 return normalize_economics_unit_notation(value) 

80 

81 @model_validator(mode="after") 

82 def _validate_role_fields(self) -> "CostCurveDriverSpec": 

83 if self.role == "formula_input" and not self.variable_symbol: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 raise ValueError("Formula input driver specs require variable_symbol.") 

85 if self.role == "discrete_selector" and self.variable_symbol: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 raise ValueError("Discrete selector driver specs cannot define variable_symbol.") 

87 for field_name in ("valid_min", "valid_max", "soft_maximum_adjustment_percent"): 

88 value = getattr(self, field_name) 

89 if value: 

90 _parse_decimal_text(value, field_name=field_name) 

91 return self 

92 

93 

94class CostCurveDriverSpecRead(CostCurveDriverSpec): 

95 """Read contract for driver specs with backend-provided unit choices.""" 

96 

97 model_config = ConfigDict(frozen=True, extra="forbid", title="CostCurveDriverSpec") 

98 

99 unit_options: list[UnitOptionPayload] = Field(default_factory=list) 

100 

101 

102class CostCurveDiscreteVariant(BaseModel): 

103 """One published curve candidate inside a discrete-family cost curve.""" 

104 

105 model_config = ConfigDict(frozen=True, extra="forbid") 

106 

107 key: str 

108 label: str 

109 selector_values: dict[str, str] 

110 expression_text: str 

111 valid_min: str = "" 

112 valid_max: str = "" 

113 valid_range_note: str = "" 

114 source_reference: str = "" 

115 notes: str = "" 

116 

117 @field_validator("key", "label", "expression_text") 

118 @classmethod 

119 def _required_text(cls, value: str) -> str: 

120 value = str(value or "").strip() 

121 if not value: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 raise ValueError("Required discrete variant fields cannot be blank.") 

123 return value 

124 

125 @field_validator("valid_min", "valid_max", "valid_range_note", "source_reference", "notes", mode="before") 

126 @classmethod 

127 def _optional_text(cls, value: Any) -> str: 

128 return "" if value is None else str(value).strip() 

129 

130 @field_validator("selector_values") 

131 @classmethod 

132 def _selector_values(cls, value: dict[str, Any]) -> dict[str, str]: 

133 normalized = {} 

134 for key, raw_value in value.items(): 

135 selector_key = str(key or "").strip() 

136 selector_value = str(raw_value or "").strip() 

137 if not selector_key or not selector_value: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 raise ValueError("Discrete variant selector values cannot be blank.") 

139 _parse_decimal_text(selector_value, field_name=f"selector_values.{selector_key}") 

140 normalized[selector_key] = selector_value 

141 return normalized 

142 

143 @model_validator(mode="after") 

144 def _validate_optional_numbers(self) -> "CostCurveDiscreteVariant": 

145 for field_name in ("valid_min", "valid_max"): 

146 value = getattr(self, field_name) 

147 if value: 

148 _parse_decimal_text(value, field_name=field_name) 

149 return self 

150 

151 

152class CapitalCostDriverInput(BaseModel): 

153 """One persisted property/manual value selection for a capital-line driver.""" 

154 

155 model_config = ConfigDict(frozen=True, extra="forbid") 

156 

157 source: Literal["", "property", "manual"] = "" 

158 property_info: int | None = None 

159 manual_value: str = "" 

160 unit: str 

161 

162 @classmethod 

163 def from_spec_default(cls, spec: CostCurveDriverSpec) -> "CapitalCostDriverInput": 

164 """Create the blank/default capital-line input for a declared spec.""" 

165 if "manual" in spec.source_options and spec.default_manual_value not in (None, ""): 

166 return cls( 

167 source="manual", 

168 property_info=None, 

169 manual_value=spec.default_manual_value, 

170 unit=spec.unit, 

171 ) 

172 if spec.source_options == ("manual",): 

173 return cls(source="", property_info=None, manual_value="", unit=spec.unit) 

174 return cls( 

175 source="", 

176 property_info=None, 

177 manual_value="", 

178 unit=spec.unit, 

179 ) 

180 

181 @field_validator("manual_value", mode="before") 

182 @classmethod 

183 def _manual_value_text(cls, value: Any) -> str: 

184 return "" if value is None else str(value).strip() 

185 

186 @field_validator("unit") 

187 @classmethod 

188 def _input_unit(cls, value: str) -> str: 

189 value = str(value or "").strip() 

190 if not value: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 raise ValueError("Capital cost driver input unit cannot be blank.") 

192 return normalize_economics_unit_notation(value) 

193 

194 

195_DRIVER_SPECS_ADAPTER = TypeAdapter(list[CostCurveDriverSpec]) 

196_DRIVER_INPUTS_ADAPTER = TypeAdapter(dict[str, CapitalCostDriverInput]) 

197_DISCRETE_VARIANTS_ADAPTER = TypeAdapter(list[CostCurveDiscreteVariant]) 

198 

199 

200def parse_required_driver_specs(specs: Any) -> tuple[CostCurveDriverSpec, ...]: 

201 """Validate and return the typed required driver spec list. 

202 

203 The uniqueness check is outside the single-spec Pydantic model because it 

204 depends on the whole list and keeps ``CapitalCostLine.driver_inputs`` keyed 

205 deterministically by spec key. 

206 """ 

207 if specs in (None, ""): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 return () 

209 parsed_specs = tuple( 

210 _DRIVER_SPECS_ADAPTER.validate_python(_specs_with_generated_keys(specs)) 

211 ) 

212 seen_keys: set[str] = set() 

213 seen_symbols: set[str] = set() 

214 formula_input_count = 0 

215 primary_count = 0 

216 for spec in parsed_specs: 

217 if spec.key in seen_keys: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 raise ValueError(f"Driver spec key `{spec.key}` is duplicated.") 

219 seen_keys.add(spec.key) 

220 if spec.role == "formula_input": 

221 formula_input_count += 1 

222 if spec.variable_symbol in seen_symbols: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 raise ValueError(f"Formula variable `{spec.variable_symbol}` is duplicated.") 

224 seen_symbols.add(spec.variable_symbol) 

225 primary_count += 1 if spec.primary else 0 

226 elif spec.primary: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 raise ValueError("Only formula input driver specs can be primary.") 

228 if formula_input_count == 0: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 raise ValueError("Cost curves require at least one formula input driver spec.") 

230 if primary_count != 1: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 raise ValueError("Cost curves require exactly one primary formula input driver spec.") 

232 return parsed_specs 

233 

234 

235def normalize_required_driver_specs(specs: Any) -> list[CostCurveDriverSpecPayload]: 

236 """Validate and serialize the JSON-ready driver spec list.""" 

237 return [driver_spec_payload(spec) for spec in parse_required_driver_specs(specs)] 

238 

239 

240def _specs_with_generated_keys(specs: Any) -> Any: 

241 """Fill missing spec keys at the API boundary. 

242 

243 Key values are internal identifiers for stored capital-line driver inputs. 

244 User-authored payloads do not need to provide them, but existing template 

245 and stored keys are preserved exactly so saved selections stay stable. 

246 """ 

247 if not isinstance(specs, list): 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 return specs 

249 seen_keys: set[str] = set() 

250 normalized_specs: list[Any] = [] 

251 for index, spec in enumerate(specs): 

252 if not isinstance(spec, dict): 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 normalized_specs.append(spec) 

254 continue 

255 next_spec = {**spec} 

256 next_spec.pop("unit_options", None) 

257 key = str(next_spec.get("key") or "").strip() 

258 if not key: 

259 key = _generated_driver_spec_key(next_spec, index=index, seen_keys=seen_keys) 

260 next_spec["key"] = key 

261 seen_keys.add(key) 

262 normalized_specs.append(next_spec) 

263 return normalized_specs 

264 

265 

266def _generated_driver_spec_key( 

267 spec: dict[str, Any], *, index: int, seen_keys: set[str] 

268) -> str: 

269 base = ( 

270 _slug_value(str(spec.get("label") or "")) 

271 or _slug_value(str(spec.get("variable_symbol") or "")) 

272 or f"input_{index + 1}" 

273 ) 

274 candidate = base 

275 suffix = 2 

276 while candidate in seen_keys: 

277 candidate = f"{base}_{suffix}" 

278 suffix += 1 

279 return candidate 

280 

281 

282def _slug_value(value: str) -> str: 

283 return re.sub( 

284 r"(^_+|_+$)", 

285 "", 

286 re.sub(r"[^a-z0-9]+", "_", value.strip().lower()), 

287 ) 

288 

289 

290def parse_discrete_variants(variants: Any) -> tuple[CostCurveDiscreteVariant, ...]: 

291 """Validate and return typed discrete-family variant rows.""" 

292 if variants in (None, ""): 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 return () 

294 parsed_variants = tuple(_DISCRETE_VARIANTS_ADAPTER.validate_python(variants)) 

295 seen_keys: set[str] = set() 

296 for variant in parsed_variants: 

297 if variant.key in seen_keys: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 raise ValueError(f"Discrete variant key `{variant.key}` is duplicated.") 

299 seen_keys.add(variant.key) 

300 return parsed_variants 

301 

302 

303def normalize_discrete_variants(variants: Any) -> list[CostCurveDiscreteVariantPayload]: 

304 """Validate and serialize discrete-family variant rows for JSON storage.""" 

305 return [discrete_variant_payload(variant) for variant in parse_discrete_variants(variants)] 

306 

307 

308def normalize_capital_cost_driver_inputs( 

309 inputs: Any, 

310) -> CapitalCostDriverInputsPayload: 

311 """Validate and serialize keyed capital-line driver inputs. 

312 

313 `CapitalCostLine.driver_inputs` is a JSON object keyed by required driver 

314 spec key. Keeping this path Pydantic-backed gives the API a precise schema 

315 without weakening writes to arbitrary JSON. 

316 """ 

317 if inputs in (None, ""): 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 return {} 

319 return { 

320 key: capital_cost_driver_input_payload(driver_input) 

321 for key, driver_input in _DRIVER_INPUTS_ADAPTER.validate_python(inputs).items() 

322 } 

323 

324 

325def driver_specs_payload( 

326 specs: Iterable[CostCurveDriverSpec], 

327) -> list[CostCurveDriverSpecPayload]: 

328 """Return JSON-ready driver specs from already-validated spec models.""" 

329 return [driver_spec_payload(spec) for spec in specs] 

330 

331 

332def driver_specs_read_payload( 

333 specs: Iterable[CostCurveDriverSpec], 

334) -> list[CostCurveDriverSpecReadPayload]: 

335 """Return API driver specs with read-only unit options attached.""" 

336 return [driver_spec_read_payload(spec) for spec in specs] 

337 

338 

339def default_driver_inputs_payload( 

340 specs: Iterable[CostCurveDriverSpec], 

341) -> CapitalCostDriverInputsPayload: 

342 """Build JSON-ready default capital-line driver inputs from typed specs.""" 

343 return { 

344 spec.key: capital_cost_driver_input_payload( 

345 CapitalCostDriverInput.from_spec_default(spec) 

346 ) 

347 for spec in specs 

348 } 

349 

350 

351def driver_spec_payload(spec: CostCurveDriverSpec) -> CostCurveDriverSpecPayload: 

352 """Serialize a typed driver spec without widening the payload type to Any.""" 

353 return { 

354 "key": spec.key, 

355 "label": spec.label, 

356 "role": spec.role, 

357 "variable_symbol": spec.variable_symbol, 

358 "unit": spec.unit, 

359 "required": spec.required, 

360 "primary": spec.primary, 

361 "valid_min": spec.valid_min, 

362 "valid_max": spec.valid_max, 

363 "valid_range_note": spec.valid_range_note, 

364 "soft_maximum_adjustment_percent": spec.soft_maximum_adjustment_percent, 

365 "default_manual_value": spec.default_manual_value, 

366 "source_options": list(spec.source_options), 

367 } 

368 

369 

370def driver_spec_read_payload(spec: CostCurveDriverSpec) -> CostCurveDriverSpecReadPayload: 

371 """Serialize one driver spec for API reads without changing stored JSON.""" 

372 return CostCurveDriverSpecRead( 

373 **driver_spec_payload(spec), 

374 unit_options=cost_curve_input_unit_options(spec.unit), 

375 ).model_dump(mode="json") 

376 

377 

378def discrete_variant_payload(variant: CostCurveDiscreteVariant) -> CostCurveDiscreteVariantPayload: 

379 """Serialize one discrete variant without widening the payload type to Any.""" 

380 return { 

381 "key": variant.key, 

382 "label": variant.label, 

383 "selector_values": variant.selector_values, 

384 "expression_text": variant.expression_text, 

385 "valid_min": variant.valid_min, 

386 "valid_max": variant.valid_max, 

387 "valid_range_note": variant.valid_range_note, 

388 "source_reference": variant.source_reference, 

389 "notes": variant.notes, 

390 } 

391 

392 

393def capital_cost_driver_input_payload( 

394 driver_input: CapitalCostDriverInput, 

395) -> CapitalCostDriverInputPayload: 

396 """Serialize one typed capital-line driver input without untyped dict access.""" 

397 return { 

398 "source": driver_input.source, 

399 "property_info": driver_input.property_info, 

400 "manual_value": driver_input.manual_value, 

401 "unit": driver_input.unit, 

402 } 

403 

404 

405def _parse_decimal_text(value: str, *, field_name: str) -> Decimal: 

406 try: 

407 decimal_value = Decimal(str(value)) 

408 except Exception as exc: 

409 raise ValueError(f"{field_name} must be a number.") from exc 

410 if not decimal_value.is_finite(): 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true

411 raise ValueError(f"{field_name} must be finite.") 

412 return decimal_value