Coverage for backend/django/Economics/reference_data/services/stats_nz_indexes.py: 92%
194 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1"""Import and use global Stats NZ economics index data.
3The locked CPI, CGPI, and PMEI fixture excerpts are platform reference data,
4not flowsheet-owned user data. The management command and service functions
5therefore populate one global series/value set that any flowsheet-scoped
6Economics study can reference through its assumptions. Service return values are
7Pydantic contracts so import diagnostics and escalation results have stable,
8serializable shapes at the JSON-field/API boundary.
9"""
11from __future__ import annotations
13import csv
14import zipfile
15from calendar import monthrange
16from datetime import date
17from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
18from pathlib import Path
19from typing import Any, Iterable
21from django.db import transaction
22from pydantic import BaseModel, ConfigDict
24from Economics.reference_data.models import CostIndexSeries, CostIndexValue
27STATS_NZ_PROVIDER = "Stats NZ"
28INDEX_VALUE_QUANTUM = Decimal("0.00000001")
31class EconomicsContract(BaseModel):
32 model_config = ConfigDict(frozen=True)
35class StatsNzIndexSeriesSpec(EconomicsContract):
36 key: str
37 name: str
38 source_series_id: str
39 source_url: str
40 release_title: str
41 source_asset_filename: str
42 source_asset_file_id: str
43 source_parent_id: str
44 index_basis: str
45 subject: str
46 group: str
47 series_title_1: str
50class StatsNzImportSource(EconomicsContract):
51 path: Path
52 specs: tuple[StatsNzIndexSeriesSpec, ...]
55class StatsNzImportResult(EconomicsContract):
56 series_imported: int
57 values_created: int
58 values_updated: int
59 rows_seen: int
60 rows_imported: int
61 latest_period_by_series: dict[str, str]
64class IndexEscalationWarning(EconomicsContract):
65 code: str
66 severity: str
67 message: str
68 context: dict[str, Any]
71class IndexEscalationResult(EconomicsContract):
72 series_key: str
73 historical_cost: Decimal
74 historical_period: str
75 current_period: str
76 historical_index: Decimal
77 current_index: Decimal
78 index_ratio: Decimal
79 current_cost: Decimal
80 warnings: tuple[IndexEscalationWarning, ...]
82 def warnings_payload(self) -> list[dict[str, Any]]:
83 """Serialize warning contracts at the model JSON/API boundary."""
84 return [warning.model_dump(mode="json") for warning in self.warnings]
87class StatsNzIndexImportError(ValueError):
88 def __init__(self, code: str, message: str, *, context: dict[str, Any] | None = None):
89 super().__init__(message)
90 self.code = code
91 self.message = message
92 self.context = context or {}
95class IndexEscalationError(ValueError):
96 def __init__(self, code: str, message: str, *, context: dict[str, Any] | None = None):
97 super().__init__(message)
98 self.code = code
99 self.message = message
100 self.context = context or {}
103CPI_INDEX_SOURCE_URL = (
104 "https://www.stats.govt.nz/assets/Uploads/Consumers-price-index/"
105 "Consumers-price-index-March-2026-quarter/Download-data/"
106 "consumers-price-index-march-2026-quarter-index-numbers.csv"
107)
108BPI_INDEX_SOURCE_URL = (
109 "https://www.stats.govt.nz/assets/Uploads/Business-price-indexes/"
110 "Business-price-indexes-March-2026-quarter/Download-data/"
111 "business-price-indexes-march-2026-quarter.zip"
112)
115STATS_NZ_CPI_ALL_GROUPS = StatsNzIndexSeriesSpec(
116 key="stats_nz_cpi_all_groups",
117 name="Stats NZ CPI all groups for New Zealand",
118 source_series_id="CPIQ.SE9A",
119 source_url=CPI_INDEX_SOURCE_URL,
120 release_title="Consumers price index: March 2026 quarter",
121 source_asset_filename="consumers-price-index-march-2026-quarter-index-numbers.csv",
122 source_asset_file_id="122911",
123 source_parent_id="122663",
124 index_basis="June 2017 quarter = 1000",
125 subject="CPI",
126 group="CPI All Groups for New Zealand",
127 series_title_1="All groups",
128)
129STATS_NZ_CGPI_ALL_GROUPS = StatsNzIndexSeriesSpec(
130 key="stats_nz_cgpi_all_groups",
131 name="Stats NZ capital goods price index all groups",
132 source_series_id="CEPQ.S611",
133 source_url=BPI_INDEX_SOURCE_URL,
134 release_title="Business price indexes: March 2026 quarter",
135 source_asset_filename="business-price-indexes-march-2026-quarter.zip",
136 source_asset_file_id="123118",
137 source_parent_id="122942",
138 index_basis="September quarter 2022 = 1000",
139 subject="Capital Goods Price Index - CEP",
140 group="Price Index all groups of capital goods (Base: September quarter 2022 = 1000)",
141 series_title_1="All Groups",
142)
143STATS_NZ_PMEI = StatsNzIndexSeriesSpec(
144 key="stats_nz_pmei",
145 name="Stats NZ plant, machinery, and equipment index",
146 source_series_id="CEPQ.S61106",
147 source_url=BPI_INDEX_SOURCE_URL,
148 release_title="Business price indexes: March 2026 quarter",
149 source_asset_filename="business-price-indexes-march-2026-quarter.zip",
150 source_asset_file_id="123118",
151 source_parent_id="122942",
152 index_basis="September quarter 2022 = 1000",
153 subject="Capital Goods Price Index - CEP",
154 group="Price Index asset types of capital goods (Base: September quarter 2022 = 1000)",
155 series_title_1="Plant, Machinery, and Equipment",
156)
157STATS_NZ_INDEX_SPECS = (
158 STATS_NZ_CPI_ALL_GROUPS,
159 STATS_NZ_CGPI_ALL_GROUPS,
160 STATS_NZ_PMEI,
161)
162STATS_NZ_INDEX_SPECS_BY_KEY = {spec.key: spec for spec in STATS_NZ_INDEX_SPECS}
165def default_stats_nz_fixture_dir() -> Path:
166 economics_app_dir = Path(__file__).resolve().parents[2]
167 return economics_app_dir / "fixtures" / "stats_nz_indexes"
170def import_locked_stats_nz_indexes(
171 *,
172 cpi_path: Path | str | None = None,
173 bpi_path: Path | str | None = None,
174) -> StatsNzImportResult:
175 """Populate or refresh the global locked Stats NZ index set."""
176 fixture_dir = default_stats_nz_fixture_dir()
177 cpi_path = Path(cpi_path) if cpi_path is not None else fixture_dir / STATS_NZ_CPI_ALL_GROUPS.source_asset_filename
178 bpi_path = Path(bpi_path) if bpi_path is not None else fixture_dir / "stats_nz_bpi_cgpi_pmei_excerpt.csv"
180 sources = (
181 StatsNzImportSource(path=cpi_path, specs=(STATS_NZ_CPI_ALL_GROUPS,)),
182 StatsNzImportSource(path=bpi_path, specs=(STATS_NZ_CGPI_ALL_GROUPS, STATS_NZ_PMEI)),
183 )
184 return import_stats_nz_indexes(sources=sources)
187def import_stats_nz_indexes(
188 *,
189 sources: Iterable[StatsNzImportSource],
190) -> StatsNzImportResult:
191 """Import locked Stats NZ source rows into global index tables idempotently."""
192 values_created = 0
193 values_updated = 0
194 rows_seen = 0
195 rows_imported = 0
196 latest_period_by_series: dict[str, str] = {}
197 imported_series_keys: set[str] = set()
199 with transaction.atomic():
200 for source in sources:
201 rows = _read_source_rows(source.path)
202 rows_seen += len(rows)
203 for spec in source.specs:
204 series = _upsert_series(spec=spec)
205 imported_series_keys.add(spec.key)
206 imported_rows_for_series = []
207 for row in rows:
208 if not _row_matches_spec(row, spec):
209 continue
210 value, created = _upsert_value(
211 series=series,
212 spec=spec,
213 row=row,
214 )
215 imported_rows_for_series.append(value)
216 rows_imported += 1
217 if created:
218 values_created += 1
219 else:
220 values_updated += 1
222 if not imported_rows_for_series:
223 raise StatsNzIndexImportError(
224 "locked_series_missing_from_source",
225 "Stats NZ source did not contain the locked series.",
226 context={"series_key": spec.key, "source_series_id": spec.source_series_id, "path": str(source.path)},
227 )
229 latest_value = max(imported_rows_for_series, key=lambda value: value.period_date)
230 series.latest_imported_period = latest_value.period
231 series.save(update_fields=["latest_imported_period", "updated_at"])
232 latest_period_by_series[series.key] = latest_value.period
234 return StatsNzImportResult(
235 series_imported=len(imported_series_keys),
236 values_created=values_created,
237 values_updated=values_updated,
238 rows_seen=rows_seen,
239 rows_imported=rows_imported,
240 latest_period_by_series=latest_period_by_series,
241 )
244def escalate_cost_with_index(
245 *,
246 series_key: str,
247 historical_cost: Decimal | int | str,
248 historical_period: str,
249 current_period: str,
250) -> IndexEscalationResult:
251 """Escalate a historical cost with an exact global index ratio."""
252 cost = _parse_decimal(historical_cost, field_name="historical_cost")
253 series = _get_locked_index_series(series_key)
254 if series is None:
255 raise IndexEscalationError(
256 "index_series_missing",
257 "Requested global index series is not imported.",
258 context={"series_key": series_key},
259 )
261 historical_value = _get_index_value(series=series, period=historical_period, role="historical")
262 current_value = _get_index_value(series=series, period=current_period, role="current")
263 if historical_value.value == 0: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 raise IndexEscalationError(
265 "historical_index_zero",
266 "Historical index value is zero and cannot be used for escalation.",
267 context={"series_key": series_key, "historical_period": historical_period},
268 )
270 index_ratio = current_value.value / historical_value.value
271 current_cost = cost * index_ratio
272 warnings = _escalation_warnings(series=series, historical_value=historical_value, current_value=current_value)
273 return IndexEscalationResult(
274 series_key=series.key,
275 historical_cost=cost,
276 historical_period=historical_value.period,
277 current_period=current_value.period,
278 historical_index=historical_value.value,
279 current_index=current_value.value,
280 index_ratio=index_ratio,
281 current_cost=current_cost,
282 warnings=tuple(warnings),
283 )
286def parse_stats_nz_quarter_period(period: str) -> date:
287 try:
288 year_text, month_text = period.split(".", maxsplit=1)
289 year = int(year_text)
290 month = int(month_text)
291 except ValueError as exc:
292 raise StatsNzIndexImportError(
293 "invalid_period",
294 "Stats NZ period must use YYYY.MM format.",
295 context={"period": period},
296 ) from exc
298 if month not in (3, 6, 9, 12):
299 raise StatsNzIndexImportError(
300 "invalid_quarter_month",
301 "Stats NZ quarterly index period must end in March, June, September, or December.",
302 context={"period": period, "month": month},
303 )
304 return date(year, month, monthrange(year, month)[1])
307def _read_source_rows(path: Path) -> list[dict[str, str]]:
308 if not path.exists(): 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 raise StatsNzIndexImportError(
310 "source_file_missing",
311 "Stats NZ source fixture was not found.",
312 context={"path": str(path)},
313 )
315 if path.suffix.lower() == ".zip":
316 with zipfile.ZipFile(path) as archive:
317 csv_members = [name for name in archive.namelist() if name.lower().endswith(".csv")]
318 if len(csv_members) != 1: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 raise StatsNzIndexImportError(
320 "zip_csv_member_ambiguous",
321 "Stats NZ BPI ZIP must contain exactly one CSV member for v1 import.",
322 context={"path": str(path), "csv_members": csv_members},
323 )
324 with archive.open(csv_members[0]) as raw_file:
325 return list(csv.DictReader(line.decode("utf-8-sig") for line in raw_file))
327 with path.open(newline="", encoding="utf-8-sig") as csv_file:
328 return list(csv.DictReader(csv_file))
331def _upsert_series(*, spec: StatsNzIndexSeriesSpec) -> CostIndexSeries:
332 series, _ = CostIndexSeries.objects.update_or_create(
333 key=spec.key,
334 provider=STATS_NZ_PROVIDER,
335 source_series_id=spec.source_series_id,
336 defaults={
337 "name": spec.name,
338 "frequency": "quarterly",
339 "unit": "Index",
340 "index_basis": spec.index_basis,
341 "source_url": spec.source_url,
342 "release_title": spec.release_title,
343 "source_asset_filename": spec.source_asset_filename,
344 "source_asset_file_id": spec.source_asset_file_id,
345 "source_parent_id": spec.source_parent_id,
346 },
347 )
348 return series
351def _get_locked_index_series(series_key: str) -> CostIndexSeries | None:
352 spec = STATS_NZ_INDEX_SPECS_BY_KEY.get(series_key)
353 queryset = CostIndexSeries.objects.filter(key=series_key)
354 if spec is not None:
355 queryset = queryset.filter(provider=STATS_NZ_PROVIDER, source_series_id=spec.source_series_id)
356 return queryset.first()
359def _upsert_value(
360 *,
361 series: CostIndexSeries,
362 spec: StatsNzIndexSeriesSpec,
363 row: dict[str, str],
364) -> tuple[CostIndexValue, bool]:
365 status = row.get("STATUS", "")
366 if status != "FINAL": 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true
367 raise StatsNzIndexImportError(
368 "unsupported_index_status",
369 "Stats NZ index importer only accepts FINAL rows in v1.",
370 context={"series_key": spec.key, "period": row.get("Period"), "status": status},
371 )
373 period = _required_row_value(row, "Period", spec=spec)
374 period_date = parse_stats_nz_quarter_period(period)
375 value = _parse_decimal(_required_row_value(row, "Data_value", spec=spec), field_name="Data_value")
376 value = value.quantize(INDEX_VALUE_QUANTUM, rounding=ROUND_HALF_UP)
377 index_value, created = CostIndexValue.objects.update_or_create(
378 series=series,
379 period=period,
380 defaults={
381 "period_date": period_date,
382 "value": value,
383 "status": status,
384 "source_asset_filename": spec.source_asset_filename,
385 "source_series_reference": row.get("Series_reference", ""),
386 "source_period": period,
387 "source_units": row.get("UNITS", ""),
388 "source_subject": row.get("Subject", ""),
389 "source_group": row.get("Group", ""),
390 "source_series_title_1": row.get("Series_title_1", ""),
391 },
392 )
393 return index_value, created
396def _row_matches_spec(row: dict[str, str], spec: StatsNzIndexSeriesSpec) -> bool:
397 return (
398 row.get("Series_reference") == spec.source_series_id
399 and row.get("UNITS") == "Index"
400 and row.get("Subject") == spec.subject
401 and row.get("Group") == spec.group
402 and row.get("Series_title_1") == spec.series_title_1
403 )
406def _required_row_value(row: dict[str, str], field_name: str, *, spec: StatsNzIndexSeriesSpec) -> str:
407 value = row.get(field_name, "")
408 if value == "": 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 raise StatsNzIndexImportError(
410 "required_source_field_missing",
411 "Stats NZ source row is missing a required field.",
412 context={"series_key": spec.key, "field": field_name},
413 )
414 return value
417def _parse_decimal(value: Decimal | int | str, *, field_name: str) -> Decimal:
418 try:
419 decimal_value = Decimal(str(value))
420 except (InvalidOperation, ValueError) as exc:
421 raise StatsNzIndexImportError(
422 "invalid_decimal",
423 "Stats NZ index importer received a non-numeric decimal field.",
424 context={"field": field_name, "value": str(value)},
425 ) from exc
426 if not decimal_value.is_finite(): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 raise StatsNzIndexImportError(
428 "invalid_decimal",
429 "Stats NZ index importer received a non-finite decimal field.",
430 context={"field": field_name, "value": str(value)},
431 )
432 return decimal_value
435def _get_index_value(*, series: CostIndexSeries, period: str, role: str) -> CostIndexValue:
436 value = series.values.filter(period=period).first()
437 if value is None:
438 raise IndexEscalationError(
439 "index_value_missing",
440 "Requested index period is not imported for this series.",
441 context={"series_key": series.key, "period": period, "role": role},
442 )
443 return value
446def _escalation_warnings(
447 *,
448 series: CostIndexSeries,
449 historical_value: CostIndexValue,
450 current_value: CostIndexValue,
451) -> list[IndexEscalationWarning]:
452 warnings: list[IndexEscalationWarning] = []
453 for value, role in ((historical_value, "historical"), (current_value, "current")):
454 if value.status != "FINAL": 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 warnings.append(
456 IndexEscalationWarning(
457 code="index_value_not_final",
458 severity="warning",
459 message="Index escalation used a non-final Stats NZ index value.",
460 context={"series_key": series.key, "period": value.period, "role": role, "status": value.status},
461 )
462 )
463 return warnings