Coverage for backend/django/Economics/reference_data/services/stats_nz_indexes.py: 92%

194 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Import and use global Stats NZ economics index data. 

2 

3The locked CPI, CGPI, and PMEI fixture excerpts are platform reference data, 

4not flowsheet-owned user data. The management command and service functions 

5therefore populate one global series/value set that any flowsheet-scoped 

6Economics study can reference through its assumptions. Service return values are 

7Pydantic contracts so import diagnostics and escalation results have stable, 

8serializable shapes at the JSON-field/API boundary. 

9""" 

10 

11from __future__ import annotations 

12 

13import csv 

14import zipfile 

15from calendar import monthrange 

16from datetime import date 

17from decimal import Decimal, InvalidOperation, ROUND_HALF_UP 

18from pathlib import Path 

19from typing import Any, Iterable 

20 

21from django.db import transaction 

22from pydantic import BaseModel, ConfigDict 

23 

24from Economics.reference_data.models import CostIndexSeries, CostIndexValue 

25 

26 

27STATS_NZ_PROVIDER = "Stats NZ" 

28INDEX_VALUE_QUANTUM = Decimal("0.00000001") 

29 

30 

31class EconomicsContract(BaseModel): 

32 model_config = ConfigDict(frozen=True) 

33 

34 

35class StatsNzIndexSeriesSpec(EconomicsContract): 

36 key: str 

37 name: str 

38 source_series_id: str 

39 source_url: str 

40 release_title: str 

41 source_asset_filename: str 

42 source_asset_file_id: str 

43 source_parent_id: str 

44 index_basis: str 

45 subject: str 

46 group: str 

47 series_title_1: str 

48 

49 

50class StatsNzImportSource(EconomicsContract): 

51 path: Path 

52 specs: tuple[StatsNzIndexSeriesSpec, ...] 

53 

54 

55class StatsNzImportResult(EconomicsContract): 

56 series_imported: int 

57 values_created: int 

58 values_updated: int 

59 rows_seen: int 

60 rows_imported: int 

61 latest_period_by_series: dict[str, str] 

62 

63 

64class IndexEscalationWarning(EconomicsContract): 

65 code: str 

66 severity: str 

67 message: str 

68 context: dict[str, Any] 

69 

70 

71class IndexEscalationResult(EconomicsContract): 

72 series_key: str 

73 historical_cost: Decimal 

74 historical_period: str 

75 current_period: str 

76 historical_index: Decimal 

77 current_index: Decimal 

78 index_ratio: Decimal 

79 current_cost: Decimal 

80 warnings: tuple[IndexEscalationWarning, ...] 

81 

82 def warnings_payload(self) -> list[dict[str, Any]]: 

83 """Serialize warning contracts at the model JSON/API boundary.""" 

84 return [warning.model_dump(mode="json") for warning in self.warnings] 

85 

86 

87class StatsNzIndexImportError(ValueError): 

88 def __init__(self, code: str, message: str, *, context: dict[str, Any] | None = None): 

89 super().__init__(message) 

90 self.code = code 

91 self.message = message 

92 self.context = context or {} 

93 

94 

95class IndexEscalationError(ValueError): 

96 def __init__(self, code: str, message: str, *, context: dict[str, Any] | None = None): 

97 super().__init__(message) 

98 self.code = code 

99 self.message = message 

100 self.context = context or {} 

101 

102 

103CPI_INDEX_SOURCE_URL = ( 

104 "https://www.stats.govt.nz/assets/Uploads/Consumers-price-index/" 

105 "Consumers-price-index-March-2026-quarter/Download-data/" 

106 "consumers-price-index-march-2026-quarter-index-numbers.csv" 

107) 

108BPI_INDEX_SOURCE_URL = ( 

109 "https://www.stats.govt.nz/assets/Uploads/Business-price-indexes/" 

110 "Business-price-indexes-March-2026-quarter/Download-data/" 

111 "business-price-indexes-march-2026-quarter.zip" 

112) 

113 

114 

115STATS_NZ_CPI_ALL_GROUPS = StatsNzIndexSeriesSpec( 

116 key="stats_nz_cpi_all_groups", 

117 name="Stats NZ CPI all groups for New Zealand", 

118 source_series_id="CPIQ.SE9A", 

119 source_url=CPI_INDEX_SOURCE_URL, 

120 release_title="Consumers price index: March 2026 quarter", 

121 source_asset_filename="consumers-price-index-march-2026-quarter-index-numbers.csv", 

122 source_asset_file_id="122911", 

123 source_parent_id="122663", 

124 index_basis="June 2017 quarter = 1000", 

125 subject="CPI", 

126 group="CPI All Groups for New Zealand", 

127 series_title_1="All groups", 

128) 

129STATS_NZ_CGPI_ALL_GROUPS = StatsNzIndexSeriesSpec( 

130 key="stats_nz_cgpi_all_groups", 

131 name="Stats NZ capital goods price index all groups", 

132 source_series_id="CEPQ.S611", 

133 source_url=BPI_INDEX_SOURCE_URL, 

134 release_title="Business price indexes: March 2026 quarter", 

135 source_asset_filename="business-price-indexes-march-2026-quarter.zip", 

136 source_asset_file_id="123118", 

137 source_parent_id="122942", 

138 index_basis="September quarter 2022 = 1000", 

139 subject="Capital Goods Price Index - CEP", 

140 group="Price Index all groups of capital goods (Base: September quarter 2022 = 1000)", 

141 series_title_1="All Groups", 

142) 

143STATS_NZ_PMEI = StatsNzIndexSeriesSpec( 

144 key="stats_nz_pmei", 

145 name="Stats NZ plant, machinery, and equipment index", 

146 source_series_id="CEPQ.S61106", 

147 source_url=BPI_INDEX_SOURCE_URL, 

148 release_title="Business price indexes: March 2026 quarter", 

149 source_asset_filename="business-price-indexes-march-2026-quarter.zip", 

150 source_asset_file_id="123118", 

151 source_parent_id="122942", 

152 index_basis="September quarter 2022 = 1000", 

153 subject="Capital Goods Price Index - CEP", 

154 group="Price Index asset types of capital goods (Base: September quarter 2022 = 1000)", 

155 series_title_1="Plant, Machinery, and Equipment", 

156) 

157STATS_NZ_INDEX_SPECS = ( 

158 STATS_NZ_CPI_ALL_GROUPS, 

159 STATS_NZ_CGPI_ALL_GROUPS, 

160 STATS_NZ_PMEI, 

161) 

162STATS_NZ_INDEX_SPECS_BY_KEY = {spec.key: spec for spec in STATS_NZ_INDEX_SPECS} 

163 

164 

165def default_stats_nz_fixture_dir() -> Path: 

166 economics_app_dir = Path(__file__).resolve().parents[2] 

167 return economics_app_dir / "fixtures" / "stats_nz_indexes" 

168 

169 

170def import_locked_stats_nz_indexes( 

171 *, 

172 cpi_path: Path | str | None = None, 

173 bpi_path: Path | str | None = None, 

174) -> StatsNzImportResult: 

175 """Populate or refresh the global locked Stats NZ index set.""" 

176 fixture_dir = default_stats_nz_fixture_dir() 

177 cpi_path = Path(cpi_path) if cpi_path is not None else fixture_dir / STATS_NZ_CPI_ALL_GROUPS.source_asset_filename 

178 bpi_path = Path(bpi_path) if bpi_path is not None else fixture_dir / "stats_nz_bpi_cgpi_pmei_excerpt.csv" 

179 

180 sources = ( 

181 StatsNzImportSource(path=cpi_path, specs=(STATS_NZ_CPI_ALL_GROUPS,)), 

182 StatsNzImportSource(path=bpi_path, specs=(STATS_NZ_CGPI_ALL_GROUPS, STATS_NZ_PMEI)), 

183 ) 

184 return import_stats_nz_indexes(sources=sources) 

185 

186 

187def import_stats_nz_indexes( 

188 *, 

189 sources: Iterable[StatsNzImportSource], 

190) -> StatsNzImportResult: 

191 """Import locked Stats NZ source rows into global index tables idempotently.""" 

192 values_created = 0 

193 values_updated = 0 

194 rows_seen = 0 

195 rows_imported = 0 

196 latest_period_by_series: dict[str, str] = {} 

197 imported_series_keys: set[str] = set() 

198 

199 with transaction.atomic(): 

200 for source in sources: 

201 rows = _read_source_rows(source.path) 

202 rows_seen += len(rows) 

203 for spec in source.specs: 

204 series = _upsert_series(spec=spec) 

205 imported_series_keys.add(spec.key) 

206 imported_rows_for_series = [] 

207 for row in rows: 

208 if not _row_matches_spec(row, spec): 

209 continue 

210 value, created = _upsert_value( 

211 series=series, 

212 spec=spec, 

213 row=row, 

214 ) 

215 imported_rows_for_series.append(value) 

216 rows_imported += 1 

217 if created: 

218 values_created += 1 

219 else: 

220 values_updated += 1 

221 

222 if not imported_rows_for_series: 

223 raise StatsNzIndexImportError( 

224 "locked_series_missing_from_source", 

225 "Stats NZ source did not contain the locked series.", 

226 context={"series_key": spec.key, "source_series_id": spec.source_series_id, "path": str(source.path)}, 

227 ) 

228 

229 latest_value = max(imported_rows_for_series, key=lambda value: value.period_date) 

230 series.latest_imported_period = latest_value.period 

231 series.save(update_fields=["latest_imported_period", "updated_at"]) 

232 latest_period_by_series[series.key] = latest_value.period 

233 

234 return StatsNzImportResult( 

235 series_imported=len(imported_series_keys), 

236 values_created=values_created, 

237 values_updated=values_updated, 

238 rows_seen=rows_seen, 

239 rows_imported=rows_imported, 

240 latest_period_by_series=latest_period_by_series, 

241 ) 

242 

243 

244def escalate_cost_with_index( 

245 *, 

246 series_key: str, 

247 historical_cost: Decimal | int | str, 

248 historical_period: str, 

249 current_period: str, 

250) -> IndexEscalationResult: 

251 """Escalate a historical cost with an exact global index ratio.""" 

252 cost = _parse_decimal(historical_cost, field_name="historical_cost") 

253 series = _get_locked_index_series(series_key) 

254 if series is None: 

255 raise IndexEscalationError( 

256 "index_series_missing", 

257 "Requested global index series is not imported.", 

258 context={"series_key": series_key}, 

259 ) 

260 

261 historical_value = _get_index_value(series=series, period=historical_period, role="historical") 

262 current_value = _get_index_value(series=series, period=current_period, role="current") 

263 if historical_value.value == 0: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 raise IndexEscalationError( 

265 "historical_index_zero", 

266 "Historical index value is zero and cannot be used for escalation.", 

267 context={"series_key": series_key, "historical_period": historical_period}, 

268 ) 

269 

270 index_ratio = current_value.value / historical_value.value 

271 current_cost = cost * index_ratio 

272 warnings = _escalation_warnings(series=series, historical_value=historical_value, current_value=current_value) 

273 return IndexEscalationResult( 

274 series_key=series.key, 

275 historical_cost=cost, 

276 historical_period=historical_value.period, 

277 current_period=current_value.period, 

278 historical_index=historical_value.value, 

279 current_index=current_value.value, 

280 index_ratio=index_ratio, 

281 current_cost=current_cost, 

282 warnings=tuple(warnings), 

283 ) 

284 

285 

286def parse_stats_nz_quarter_period(period: str) -> date: 

287 try: 

288 year_text, month_text = period.split(".", maxsplit=1) 

289 year = int(year_text) 

290 month = int(month_text) 

291 except ValueError as exc: 

292 raise StatsNzIndexImportError( 

293 "invalid_period", 

294 "Stats NZ period must use YYYY.MM format.", 

295 context={"period": period}, 

296 ) from exc 

297 

298 if month not in (3, 6, 9, 12): 

299 raise StatsNzIndexImportError( 

300 "invalid_quarter_month", 

301 "Stats NZ quarterly index period must end in March, June, September, or December.", 

302 context={"period": period, "month": month}, 

303 ) 

304 return date(year, month, monthrange(year, month)[1]) 

305 

306 

307def _read_source_rows(path: Path) -> list[dict[str, str]]: 

308 if not path.exists(): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 raise StatsNzIndexImportError( 

310 "source_file_missing", 

311 "Stats NZ source fixture was not found.", 

312 context={"path": str(path)}, 

313 ) 

314 

315 if path.suffix.lower() == ".zip": 

316 with zipfile.ZipFile(path) as archive: 

317 csv_members = [name for name in archive.namelist() if name.lower().endswith(".csv")] 

318 if len(csv_members) != 1: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 raise StatsNzIndexImportError( 

320 "zip_csv_member_ambiguous", 

321 "Stats NZ BPI ZIP must contain exactly one CSV member for v1 import.", 

322 context={"path": str(path), "csv_members": csv_members}, 

323 ) 

324 with archive.open(csv_members[0]) as raw_file: 

325 return list(csv.DictReader(line.decode("utf-8-sig") for line in raw_file)) 

326 

327 with path.open(newline="", encoding="utf-8-sig") as csv_file: 

328 return list(csv.DictReader(csv_file)) 

329 

330 

331def _upsert_series(*, spec: StatsNzIndexSeriesSpec) -> CostIndexSeries: 

332 series, _ = CostIndexSeries.objects.update_or_create( 

333 key=spec.key, 

334 provider=STATS_NZ_PROVIDER, 

335 source_series_id=spec.source_series_id, 

336 defaults={ 

337 "name": spec.name, 

338 "frequency": "quarterly", 

339 "unit": "Index", 

340 "index_basis": spec.index_basis, 

341 "source_url": spec.source_url, 

342 "release_title": spec.release_title, 

343 "source_asset_filename": spec.source_asset_filename, 

344 "source_asset_file_id": spec.source_asset_file_id, 

345 "source_parent_id": spec.source_parent_id, 

346 }, 

347 ) 

348 return series 

349 

350 

351def _get_locked_index_series(series_key: str) -> CostIndexSeries | None: 

352 spec = STATS_NZ_INDEX_SPECS_BY_KEY.get(series_key) 

353 queryset = CostIndexSeries.objects.filter(key=series_key) 

354 if spec is not None: 

355 queryset = queryset.filter(provider=STATS_NZ_PROVIDER, source_series_id=spec.source_series_id) 

356 return queryset.first() 

357 

358 

359def _upsert_value( 

360 *, 

361 series: CostIndexSeries, 

362 spec: StatsNzIndexSeriesSpec, 

363 row: dict[str, str], 

364) -> tuple[CostIndexValue, bool]: 

365 status = row.get("STATUS", "") 

366 if status != "FINAL": 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 raise StatsNzIndexImportError( 

368 "unsupported_index_status", 

369 "Stats NZ index importer only accepts FINAL rows in v1.", 

370 context={"series_key": spec.key, "period": row.get("Period"), "status": status}, 

371 ) 

372 

373 period = _required_row_value(row, "Period", spec=spec) 

374 period_date = parse_stats_nz_quarter_period(period) 

375 value = _parse_decimal(_required_row_value(row, "Data_value", spec=spec), field_name="Data_value") 

376 value = value.quantize(INDEX_VALUE_QUANTUM, rounding=ROUND_HALF_UP) 

377 index_value, created = CostIndexValue.objects.update_or_create( 

378 series=series, 

379 period=period, 

380 defaults={ 

381 "period_date": period_date, 

382 "value": value, 

383 "status": status, 

384 "source_asset_filename": spec.source_asset_filename, 

385 "source_series_reference": row.get("Series_reference", ""), 

386 "source_period": period, 

387 "source_units": row.get("UNITS", ""), 

388 "source_subject": row.get("Subject", ""), 

389 "source_group": row.get("Group", ""), 

390 "source_series_title_1": row.get("Series_title_1", ""), 

391 }, 

392 ) 

393 return index_value, created 

394 

395 

396def _row_matches_spec(row: dict[str, str], spec: StatsNzIndexSeriesSpec) -> bool: 

397 return ( 

398 row.get("Series_reference") == spec.source_series_id 

399 and row.get("UNITS") == "Index" 

400 and row.get("Subject") == spec.subject 

401 and row.get("Group") == spec.group 

402 and row.get("Series_title_1") == spec.series_title_1 

403 ) 

404 

405 

406def _required_row_value(row: dict[str, str], field_name: str, *, spec: StatsNzIndexSeriesSpec) -> str: 

407 value = row.get(field_name, "") 

408 if value == "": 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 raise StatsNzIndexImportError( 

410 "required_source_field_missing", 

411 "Stats NZ source row is missing a required field.", 

412 context={"series_key": spec.key, "field": field_name}, 

413 ) 

414 return value 

415 

416 

417def _parse_decimal(value: Decimal | int | str, *, field_name: str) -> Decimal: 

418 try: 

419 decimal_value = Decimal(str(value)) 

420 except (InvalidOperation, ValueError) as exc: 

421 raise StatsNzIndexImportError( 

422 "invalid_decimal", 

423 "Stats NZ index importer received a non-numeric decimal field.", 

424 context={"field": field_name, "value": str(value)}, 

425 ) from exc 

426 if not decimal_value.is_finite(): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 raise StatsNzIndexImportError( 

428 "invalid_decimal", 

429 "Stats NZ index importer received a non-finite decimal field.", 

430 context={"field": field_name, "value": str(value)}, 

431 ) 

432 return decimal_value 

433 

434 

435def _get_index_value(*, series: CostIndexSeries, period: str, role: str) -> CostIndexValue: 

436 value = series.values.filter(period=period).first() 

437 if value is None: 

438 raise IndexEscalationError( 

439 "index_value_missing", 

440 "Requested index period is not imported for this series.", 

441 context={"series_key": series.key, "period": period, "role": role}, 

442 ) 

443 return value 

444 

445 

446def _escalation_warnings( 

447 *, 

448 series: CostIndexSeries, 

449 historical_value: CostIndexValue, 

450 current_value: CostIndexValue, 

451) -> list[IndexEscalationWarning]: 

452 warnings: list[IndexEscalationWarning] = [] 

453 for value, role in ((historical_value, "historical"), (current_value, "current")): 

454 if value.status != "FINAL": 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 warnings.append( 

456 IndexEscalationWarning( 

457 code="index_value_not_final", 

458 severity="warning", 

459 message="Index escalation used a non-final Stats NZ index value.", 

460 context={"series_key": series.key, "period": value.period, "role": role, "status": value.status}, 

461 ) 

462 ) 

463 return warnings