Coverage for backend/django/Economics/shared/unit_options.py: 92%

99 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Primitive economics unit-option helpers shared by domain serializers.""" 

2 

3from __future__ import annotations 

4 

5import re 

6from collections.abc import Iterable 

7 

8from core.auxiliary.enums.unitsLibrary import get_unit_choices 

9from core.auxiliary.enums.unitsOfMeasure import UnitOfMeasure 

10 

11 

12UnitOption = dict[str, str] 

13MAINTENANCE_RATE_UNIT = "fraction of FCI" 

14STEAM_PRICE_UNIT_OPTIONS: tuple[UnitOption, ...] = ( 

15 {"value": "NZD/t", "label": "NZD/t"}, 

16) 

17MAINTENANCE_RATE_UNIT_OPTIONS: tuple[UnitOption, ...] = ( 

18 {"value": MAINTENANCE_RATE_UNIT, "label": MAINTENANCE_RATE_UNIT}, 

19) 

20ENERGY_DENOMINATOR_UNITS = {"GJ", "MJ", "kWh", "MWh", "GWh"} 

21TIME_DENOMINATOR_UNITS = {"s", "min", "h", "d", "year"} 

22MASS_DENOMINATOR_UNITS = {"kg", "g", "t", "lb"} 

23VOLUME_DENOMINATOR_UNITS = {"m^3", "cm^3", "L", "ft^3"} 

24_PHYSICAL_UNIT_TYPES_FOR_LOOKUP = ( 

25 UnitOfMeasure.massflow, 

26 UnitOfMeasure.volumetricFlow, 

27 UnitOfMeasure.molarflow, 

28 UnitOfMeasure.heatflow, 

29 UnitOfMeasure.energy, 

30 UnitOfMeasure.mass, 

31 UnitOfMeasure.volume, 

32 UnitOfMeasure.area, 

33 UnitOfMeasure.time, 

34 UnitOfMeasure.ratio, 

35) 

36 

37 

38def registry_unit_values(unit_type: str) -> list[str]: 

39 return [value for value, _label in get_unit_choices(unit_type)] 

40 

41 

42def registry_unit_options(unit_type: str) -> list[UnitOption]: 

43 return unit_options_for_choices(get_unit_choices(unit_type)) 

44 

45 

46def price_denominator_units(unit_type: str) -> set[str]: 

47 return { 

48 value.partition("/")[2].strip() 

49 for value in registry_unit_values(unit_type) 

50 if "/" in value 

51 } 

52 

53 

54def price_denominator_options(unit_type: str) -> list[UnitOption]: 

55 options: list[UnitOption] = [] 

56 for value, label in get_unit_choices(unit_type): 

57 _value_prefix, value_separator, value_denominator = value.partition("/") 

58 _label_prefix, label_separator, label_denominator = label.partition("/") 

59 if value_separator != "/": 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 continue 

61 options.append( 

62 { 

63 "value": value_denominator.strip(), 

64 "label": (label_denominator if label_separator == "/" else value_denominator).strip(), 

65 } 

66 ) 

67 return dedupe_unit_options(options) 

68 

69 

70def unit_options_for_choices(choices: Iterable[tuple[str, str]]) -> list[UnitOption]: 

71 return [{"value": value, "label": label} for value, label in choices] 

72 

73 

74def unit_options_for_unit_type(unit_type: str) -> list[UnitOption]: 

75 return unit_options_for_choices(get_unit_choices(unit_type)) 

76 

77 

78def with_current_unit(options: Iterable[UnitOption], current_unit: str | None) -> list[UnitOption]: 

79 """Return options with ``current_unit`` preserved even for custom legacy text.""" 

80 normalized_options = [dict(option) for option in options] 

81 current = (current_unit or "").strip() 

82 if current and all(option["value"] != current for option in normalized_options): 

83 return [{"value": current, "label": current}, *normalized_options] 

84 return normalized_options 

85 

86 

87def infer_physical_unit_type(unit: str) -> str: 

88 normalized = normalize_unit(unit) 

89 if not normalized: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 return "" 

91 for unit_type in _PHYSICAL_UNIT_TYPES_FOR_LOOKUP: 

92 for value, _label in get_unit_choices(unit_type): 

93 if normalize_unit(value) == normalized: 

94 return str(unit_type) 

95 return "" 

96 

97 

98def dedupe_unit_options(options: Iterable[UnitOption]) -> list[UnitOption]: 

99 seen: set[str] = set() 

100 deduped: list[UnitOption] = [] 

101 for option in options: 

102 value = option["value"] 

103 if value in seen: 

104 continue 

105 seen.add(value) 

106 deduped.append(dict(option)) 

107 return deduped 

108 

109 

110def with_currency(options: Iterable[UnitOption], *, currency: str) -> list[UnitOption]: 

111 return [ 

112 { 

113 "value": option["value"].replace("NZD", currency), 

114 "label": option["label"].replace("NZD", currency), 

115 } 

116 for option in options 

117 ] 

118 

119 

120def currency_price_options(unit_type: str, currency: str) -> list[UnitOption]: 

121 options = [] 

122 for value, _label in get_unit_choices(unit_type): 

123 converted = value.replace("megadollar", f"M{currency}").replace("dollar", currency) 

124 options.append({"value": converted, "label": converted}) 

125 return options 

126 

127 

128def denominator_price_options(unit_type: str, currency: str) -> list[UnitOption]: 

129 return [{"value": f"{currency}/{unit}", "label": f"{currency}/{unit}"} for unit, _label in get_unit_choices(unit_type)] 

130 

131 

132def price_unit_options_for_rate_unit( 

133 rate_unit: str, 

134 *, 

135 currency: str = "NZD", 

136 source_basis_unit: str = "", 

137 fixed_annual: bool = False, 

138) -> list[UnitOption]: 

139 current = (rate_unit or "").strip() 

140 denominator = rate_denominator_unit(current) 

141 source_unit = strip_time_denominator(source_basis_unit) 

142 

143 if current == MAINTENANCE_RATE_UNIT: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 return with_current_unit(MAINTENANCE_RATE_UNIT_OPTIONS, current) 

145 if "steam" in current.lower(): 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 return with_current_unit(with_currency(STEAM_PRICE_UNIT_OPTIONS, currency=currency), current) 

147 if denominator in ENERGY_DENOMINATOR_UNITS or source_unit in ENERGY_DENOMINATOR_UNITS: 

148 return with_current_unit(currency_price_options(UnitOfMeasure.powerPrice, currency), current) 

149 if denominator in MASS_DENOMINATOR_UNITS or source_unit in MASS_DENOMINATOR_UNITS: 

150 return with_current_unit(denominator_price_options(UnitOfMeasure.mass, currency), current) 

151 if denominator in VOLUME_DENOMINATOR_UNITS or source_unit in VOLUME_DENOMINATOR_UNITS: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 return with_current_unit(denominator_price_options(UnitOfMeasure.volume, currency), current) 

153 if denominator in TIME_DENOMINATOR_UNITS or fixed_annual: 

154 return with_current_unit(currency_price_options(UnitOfMeasure.costRate, currency), current) 

155 return with_current_unit([], current) 

156 

157 

158def currency_prefix(rate_unit: str) -> str: 

159 prefix, _separator, _denominator = (rate_unit or "").partition("/") 

160 return prefix if prefix.isalpha() and len(prefix) == 3 else "" 

161 

162 

163def rate_denominator_unit(rate_unit: str) -> str: 

164 _prefix, separator, denominator = (rate_unit or "").partition("/") 

165 return denominator.strip() if separator else "" 

166 

167 

168def strip_time_denominator(unit: str) -> str: 

169 value = (unit or "").strip() 

170 match = re.fullmatch(r"(.+)/(s|min|h|hr|d|day|year)", value) 

171 if not match: 

172 return value 

173 return normalize_unit(match.group(1)) 

174 

175 

176def normalize_unit(unit: str) -> str: 

177 return (unit or "").strip().replace("hr", "h").replace("tonne", "t")