Coverage for backend/django/Economics/results/services/lifecycle/runs.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Result-run reuse, classification, and stale-state transitions. 

2 

3This module owns status decisions for persisted result runs. It compares stored 

4fingerprints with current source fingerprints, annotates reuse/stale events, and 

5must not calculate new economics metrics or materialize result lines. 

6""" 

7 

8from __future__ import annotations 

9 

10from django.db import transaction 

11from django.utils import timezone 

12 

13from Economics.results.models import EconomicsResultRun 

14 

15from Economics.studies.models import EconomicsStudy 

16 

17from Economics.shared.choices import ResultRunStatus 

18from Economics.results.services.lifecycle.fingerprints import ( 

19 DependencyFingerprint, 

20 build_dependency_fingerprints, 

21 _fingerprint_map, 

22 _stored_dependency_map, 

23) 

24 

25 

26class ResultRunClassification: 

27 """Stable result-run classification labels exposed through serializers.""" 

28 

29 CURRENT = "current" 

30 STALE = "stale" 

31 REUSABLE = "reusable" 

32 

33 

34 

35def classify_result_run(result_run: EconomicsResultRun) -> str: 

36 """Classify a persisted run against the study's current dependency state.""" 

37 current_map = _fingerprint_map(build_dependency_fingerprints(result_run.study)) 

38 run_map = _stored_dependency_map(result_run) 

39 if run_map == current_map: 

40 if result_run.status == ResultRunStatus.CURRENT: 

41 return ResultRunClassification.CURRENT 

42 return ResultRunClassification.REUSABLE 

43 return ResultRunClassification.STALE 

44 

45 

46def mark_result_runs_stale_for_study( 

47 *, 

48 study: EconomicsStudy, 

49 reason: str, 

50 requires_solve: bool = False, 

51) -> int: 

52 """Mark current runs stale with diagnostic context, without calculating new results.""" 

53 with transaction.atomic(): 

54 runs = list( 

55 EconomicsResultRun.objects.select_for_update().filter( 

56 flowsheet=study.flowsheet, 

57 study=study, 

58 status=ResultRunStatus.CURRENT, 

59 ) 

60 ) 

61 for run in runs: 

62 _mark_run_stale(run=run, reason=reason, requires_solve=requires_solve) 

63 return len(runs) 

64 

65 

66def _matching_result_run( 

67 *, 

68 study: EconomicsStudy, 

69 fingerprints: list[DependencyFingerprint], 

70) -> EconomicsResultRun | None: 

71 """Return the newest current/stale run whose stored dependencies still match.""" 

72 current_map = _fingerprint_map(fingerprints) 

73 for status in (ResultRunStatus.CURRENT, ResultRunStatus.STALE): 

74 for result_run in EconomicsResultRun.objects.filter( 

75 flowsheet=study.flowsheet, 

76 study=study, 

77 status=status, 

78 ).order_by("-created_at"): 

79 if _stored_dependency_map(result_run) == current_map: 

80 return result_run 

81 return None 

82 

83 

84def _mark_nonmatching_current_runs_stale( 

85 *, 

86 study: EconomicsStudy, 

87 fingerprints: list[DependencyFingerprint], 

88 reason: str, 

89) -> None: 

90 """Mark only current runs stale when their dependency map no longer matches.""" 

91 current_map = _fingerprint_map(fingerprints) 

92 for result_run in EconomicsResultRun.objects.select_for_update().filter( 

93 flowsheet=study.flowsheet, 

94 study=study, 

95 status=ResultRunStatus.CURRENT, 

96 ): 

97 if _stored_dependency_map(result_run) != current_map: 

98 _mark_run_stale(run=result_run, reason=reason, requires_solve=False) 

99 

100 

101def _mark_run_stale(*, run: EconomicsResultRun, reason: str, requires_solve: bool) -> None: 

102 """Append a stale event without discarding existing warning diagnostics.""" 

103 payload = dict(run.warning_payload or {}) 

104 stale_events = list(payload.get("stale_events", [])) 

105 stale_events.append( 

106 { 

107 "reason": reason, 

108 "requires_solve": requires_solve, 

109 "marked_at": timezone.now().isoformat(), 

110 "previous_status": run.status, 

111 } 

112 ) 

113 payload["stale_events"] = stale_events 

114 payload["latest_stale_reason"] = reason 

115 payload["requires_solve"] = requires_solve 

116 run.status = ResultRunStatus.STALE 

117 run.warning_payload = payload 

118 run.save(update_fields=["status", "warning_payload"]) 

119 

120 

121def _annotate_reused_run(*, result_run: EconomicsResultRun, reason: str, duration_ms: int) -> None: 

122 """Record reuse diagnostics after a matching run is promoted or refreshed.""" 

123 payload = dict(result_run.warning_payload or {}) 

124 reuse_events = list(payload.get("reuse_events", [])) 

125 reuse_events.append( 

126 { 

127 "reason": reason, 

128 "reused_at": timezone.now().isoformat(), 

129 "duration_ms": duration_ms, 

130 } 

131 ) 

132 payload["reuse_events"] = reuse_events 

133 result_run.warning_payload = payload 

134 result_run.save(update_fields=["warning_payload"])