Coverage for backend/django/Economics/results/services/lifecycle/runs.py: 100%
54 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1"""Result-run reuse, classification, and stale-state transitions.
3This module owns status decisions for persisted result runs. It compares stored
4fingerprints with current source fingerprints, annotates reuse/stale events, and
5must not calculate new economics metrics or materialize result lines.
6"""
8from __future__ import annotations
10from django.db import transaction
11from django.utils import timezone
13from Economics.results.models import EconomicsResultRun
15from Economics.studies.models import EconomicsStudy
17from Economics.shared.choices import ResultRunStatus
18from Economics.results.services.lifecycle.fingerprints import (
19 DependencyFingerprint,
20 build_dependency_fingerprints,
21 _fingerprint_map,
22 _stored_dependency_map,
23)
26class ResultRunClassification:
27 """Stable result-run classification labels exposed through serializers."""
29 CURRENT = "current"
30 STALE = "stale"
31 REUSABLE = "reusable"
35def classify_result_run(result_run: EconomicsResultRun) -> str:
36 """Classify a persisted run against the study's current dependency state."""
37 current_map = _fingerprint_map(build_dependency_fingerprints(result_run.study))
38 run_map = _stored_dependency_map(result_run)
39 if run_map == current_map:
40 if result_run.status == ResultRunStatus.CURRENT:
41 return ResultRunClassification.CURRENT
42 return ResultRunClassification.REUSABLE
43 return ResultRunClassification.STALE
46def mark_result_runs_stale_for_study(
47 *,
48 study: EconomicsStudy,
49 reason: str,
50 requires_solve: bool = False,
51) -> int:
52 """Mark current runs stale with diagnostic context, without calculating new results."""
53 with transaction.atomic():
54 runs = list(
55 EconomicsResultRun.objects.select_for_update().filter(
56 flowsheet=study.flowsheet,
57 study=study,
58 status=ResultRunStatus.CURRENT,
59 )
60 )
61 for run in runs:
62 _mark_run_stale(run=run, reason=reason, requires_solve=requires_solve)
63 return len(runs)
66def _matching_result_run(
67 *,
68 study: EconomicsStudy,
69 fingerprints: list[DependencyFingerprint],
70) -> EconomicsResultRun | None:
71 """Return the newest current/stale run whose stored dependencies still match."""
72 current_map = _fingerprint_map(fingerprints)
73 for status in (ResultRunStatus.CURRENT, ResultRunStatus.STALE):
74 for result_run in EconomicsResultRun.objects.filter(
75 flowsheet=study.flowsheet,
76 study=study,
77 status=status,
78 ).order_by("-created_at"):
79 if _stored_dependency_map(result_run) == current_map:
80 return result_run
81 return None
84def _mark_nonmatching_current_runs_stale(
85 *,
86 study: EconomicsStudy,
87 fingerprints: list[DependencyFingerprint],
88 reason: str,
89) -> None:
90 """Mark only current runs stale when their dependency map no longer matches."""
91 current_map = _fingerprint_map(fingerprints)
92 for result_run in EconomicsResultRun.objects.select_for_update().filter(
93 flowsheet=study.flowsheet,
94 study=study,
95 status=ResultRunStatus.CURRENT,
96 ):
97 if _stored_dependency_map(result_run) != current_map:
98 _mark_run_stale(run=result_run, reason=reason, requires_solve=False)
101def _mark_run_stale(*, run: EconomicsResultRun, reason: str, requires_solve: bool) -> None:
102 """Append a stale event without discarding existing warning diagnostics."""
103 payload = dict(run.warning_payload or {})
104 stale_events = list(payload.get("stale_events", []))
105 stale_events.append(
106 {
107 "reason": reason,
108 "requires_solve": requires_solve,
109 "marked_at": timezone.now().isoformat(),
110 "previous_status": run.status,
111 }
112 )
113 payload["stale_events"] = stale_events
114 payload["latest_stale_reason"] = reason
115 payload["requires_solve"] = requires_solve
116 run.status = ResultRunStatus.STALE
117 run.warning_payload = payload
118 run.save(update_fields=["status", "warning_payload"])
121def _annotate_reused_run(*, result_run: EconomicsResultRun, reason: str, duration_ms: int) -> None:
122 """Record reuse diagnostics after a matching run is promoted or refreshed."""
123 payload = dict(result_run.warning_payload or {})
124 reuse_events = list(payload.get("reuse_events", []))
125 reuse_events.append(
126 {
127 "reason": reason,
128 "reused_at": timezone.now().isoformat(),
129 "duration_ms": duration_ms,
130 }
131 )
132 payload["reuse_events"] = reuse_events
133 result_run.warning_payload = payload
134 result_run.save(update_fields=["warning_payload"])