Coverage for backend/django/Economics/shared/unit_options.py: 92%
99 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1"""Primitive economics unit-option helpers shared by domain serializers."""
3from __future__ import annotations
5import re
6from collections.abc import Iterable
8from core.auxiliary.enums.unitsLibrary import get_unit_choices
9from core.auxiliary.enums.unitsOfMeasure import UnitOfMeasure
12UnitOption = dict[str, str]
13MAINTENANCE_RATE_UNIT = "fraction of FCI"
14STEAM_PRICE_UNIT_OPTIONS: tuple[UnitOption, ...] = (
15 {"value": "NZD/t", "label": "NZD/t"},
16)
17MAINTENANCE_RATE_UNIT_OPTIONS: tuple[UnitOption, ...] = (
18 {"value": MAINTENANCE_RATE_UNIT, "label": MAINTENANCE_RATE_UNIT},
19)
20ENERGY_DENOMINATOR_UNITS = {"GJ", "MJ", "kWh", "MWh", "GWh"}
21TIME_DENOMINATOR_UNITS = {"s", "min", "h", "d", "year"}
22MASS_DENOMINATOR_UNITS = {"kg", "g", "t", "lb"}
23VOLUME_DENOMINATOR_UNITS = {"m^3", "cm^3", "L", "ft^3"}
24_PHYSICAL_UNIT_TYPES_FOR_LOOKUP = (
25 UnitOfMeasure.massflow,
26 UnitOfMeasure.volumetricFlow,
27 UnitOfMeasure.molarflow,
28 UnitOfMeasure.heatflow,
29 UnitOfMeasure.energy,
30 UnitOfMeasure.mass,
31 UnitOfMeasure.volume,
32 UnitOfMeasure.area,
33 UnitOfMeasure.time,
34 UnitOfMeasure.ratio,
35)
38def registry_unit_values(unit_type: str) -> list[str]:
39 return [value for value, _label in get_unit_choices(unit_type)]
42def registry_unit_options(unit_type: str) -> list[UnitOption]:
43 return unit_options_for_choices(get_unit_choices(unit_type))
46def price_denominator_units(unit_type: str) -> set[str]:
47 return {
48 value.partition("/")[2].strip()
49 for value in registry_unit_values(unit_type)
50 if "/" in value
51 }
54def price_denominator_options(unit_type: str) -> list[UnitOption]:
55 options: list[UnitOption] = []
56 for value, label in get_unit_choices(unit_type):
57 _value_prefix, value_separator, value_denominator = value.partition("/")
58 _label_prefix, label_separator, label_denominator = label.partition("/")
59 if value_separator != "/": 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 continue
61 options.append(
62 {
63 "value": value_denominator.strip(),
64 "label": (label_denominator if label_separator == "/" else value_denominator).strip(),
65 }
66 )
67 return dedupe_unit_options(options)
70def unit_options_for_choices(choices: Iterable[tuple[str, str]]) -> list[UnitOption]:
71 return [{"value": value, "label": label} for value, label in choices]
74def unit_options_for_unit_type(unit_type: str) -> list[UnitOption]:
75 return unit_options_for_choices(get_unit_choices(unit_type))
78def with_current_unit(options: Iterable[UnitOption], current_unit: str | None) -> list[UnitOption]:
79 """Return options with ``current_unit`` preserved even for custom legacy text."""
80 normalized_options = [dict(option) for option in options]
81 current = (current_unit or "").strip()
82 if current and all(option["value"] != current for option in normalized_options):
83 return [{"value": current, "label": current}, *normalized_options]
84 return normalized_options
87def infer_physical_unit_type(unit: str) -> str:
88 normalized = normalize_unit(unit)
89 if not normalized: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 return ""
91 for unit_type in _PHYSICAL_UNIT_TYPES_FOR_LOOKUP:
92 for value, _label in get_unit_choices(unit_type):
93 if normalize_unit(value) == normalized:
94 return str(unit_type)
95 return ""
98def dedupe_unit_options(options: Iterable[UnitOption]) -> list[UnitOption]:
99 seen: set[str] = set()
100 deduped: list[UnitOption] = []
101 for option in options:
102 value = option["value"]
103 if value in seen:
104 continue
105 seen.add(value)
106 deduped.append(dict(option))
107 return deduped
110def with_currency(options: Iterable[UnitOption], *, currency: str) -> list[UnitOption]:
111 return [
112 {
113 "value": option["value"].replace("NZD", currency),
114 "label": option["label"].replace("NZD", currency),
115 }
116 for option in options
117 ]
120def currency_price_options(unit_type: str, currency: str) -> list[UnitOption]:
121 options = []
122 for value, _label in get_unit_choices(unit_type):
123 converted = value.replace("megadollar", f"M{currency}").replace("dollar", currency)
124 options.append({"value": converted, "label": converted})
125 return options
128def denominator_price_options(unit_type: str, currency: str) -> list[UnitOption]:
129 return [{"value": f"{currency}/{unit}", "label": f"{currency}/{unit}"} for unit, _label in get_unit_choices(unit_type)]
132def price_unit_options_for_rate_unit(
133 rate_unit: str,
134 *,
135 currency: str = "NZD",
136 source_basis_unit: str = "",
137 fixed_annual: bool = False,
138) -> list[UnitOption]:
139 current = (rate_unit or "").strip()
140 denominator = rate_denominator_unit(current)
141 source_unit = strip_time_denominator(source_basis_unit)
143 if current == MAINTENANCE_RATE_UNIT: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 return with_current_unit(MAINTENANCE_RATE_UNIT_OPTIONS, current)
145 if "steam" in current.lower(): 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 return with_current_unit(with_currency(STEAM_PRICE_UNIT_OPTIONS, currency=currency), current)
147 if denominator in ENERGY_DENOMINATOR_UNITS or source_unit in ENERGY_DENOMINATOR_UNITS:
148 return with_current_unit(currency_price_options(UnitOfMeasure.powerPrice, currency), current)
149 if denominator in MASS_DENOMINATOR_UNITS or source_unit in MASS_DENOMINATOR_UNITS:
150 return with_current_unit(denominator_price_options(UnitOfMeasure.mass, currency), current)
151 if denominator in VOLUME_DENOMINATOR_UNITS or source_unit in VOLUME_DENOMINATOR_UNITS: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 return with_current_unit(denominator_price_options(UnitOfMeasure.volume, currency), current)
153 if denominator in TIME_DENOMINATOR_UNITS or fixed_annual:
154 return with_current_unit(currency_price_options(UnitOfMeasure.costRate, currency), current)
155 return with_current_unit([], current)
158def currency_prefix(rate_unit: str) -> str:
159 prefix, _separator, _denominator = (rate_unit or "").partition("/")
160 return prefix if prefix.isalpha() and len(prefix) == 3 else ""
163def rate_denominator_unit(rate_unit: str) -> str:
164 _prefix, separator, denominator = (rate_unit or "").partition("/")
165 return denominator.strip() if separator else ""
168def strip_time_denominator(unit: str) -> str:
169 value = (unit or "").strip()
170 match = re.fullmatch(r"(.+)/(s|min|h|hr|d|day|year)", value)
171 if not match:
172 return value
173 return normalize_unit(match.group(1))
176def normalize_unit(unit: str) -> str:
177 return (unit or "").strip().replace("hr", "h").replace("tonne", "t")