Coverage for backend/pinch_service/OpenPinch/src/classes/zone.py: 74%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from __future__ import annotations 

2from ..lib.enums import * 

3from ..lib.config import * 

4from typing import Optional, TYPE_CHECKING 

5from .stream_collection import StreamCollection 

6from .value import Value 

7from .target import Target 

8 

9if TYPE_CHECKING: 

10 from .stream import Stream 

11 

12 

13class Zone(): 

14 """Class representing any type of spatial zone or target (operation, zone, site, region, etc).""" 

15 

16 def __init__(self, name: str = "Zone", identifier: str = ZoneType.P.value, config: Optional[Configuration] = None, parent_zone: Zone = None): 

17 

18 # === Metadata === 

19 self._name = name 

20 self._identifier = identifier 

21 self._config = config or Configuration() 

22 self._parent_zone = parent_zone 

23 self._active = True 

24 self._subzones = {} 

25 self._targets = {} 

26 

27 # === Streams & Utilities === 

28 self._hot_streams: StreamCollection = StreamCollection() 

29 self._cold_streams: StreamCollection = StreamCollection() 

30 self._net_hot_streams: StreamCollection = StreamCollection() 

31 self._net_cold_streams: StreamCollection = StreamCollection() 

32 self._hot_utilities: StreamCollection = StreamCollection() 

33 self._cold_utilities: StreamCollection = StreamCollection() 

34 

35 # === Properties === 

36 

37 @property 

38 def name(self): return self._name 

39 @name.setter 

40 def name(self, value): self._name = value 40 ↛ exitline 40 didn't return from function 'name' because

41 

42 @property 

43 def identifier(self): return self._identifier 

44 @identifier.setter 

45 def identifier(self, value): self._identifier = value 45 ↛ exitline 45 didn't return from function 'identifier' because

46 

47 @property 

48 def config(self): return self._config 

49 @config.setter 

50 def config(self, value): self._config = value 50 ↛ exitline 50 didn't return from function 'config' because

51 

52 @property 

53 def parent_zone(self): return self._parent_zone 

54 @parent_zone.setter 

55 def parent_zone(self, value): self._parent_zone = value 55 ↛ exitline 55 didn't return from function 'parent_zone' because

56 

57 @property 

58 def active(self) -> bool: 

59 """Whether the stream is active in analysis.""" 

60 if isinstance(self._active, Value): 

61 return self._active.value 

62 else: 

63 return self._active 

64 @active.setter 

65 def active(self, value: bool): 

66 self._active = Value(value) 

67 

68 @property 

69 def hot_streams(self): return self._hot_streams 

70 @hot_streams.setter 

71 def hot_streams(self, data): self._hot_streams = data 71 ↛ exitline 71 didn't return from function 'hot_streams' because

72 

73 @property 

74 def cold_streams(self): return self._cold_streams 

75 @cold_streams.setter 

76 def cold_streams(self, data): self._cold_streams = data 76 ↛ exitline 76 didn't return from function 'cold_streams' because

77 

78 @property 

79 def net_hot_streams(self): return self._net_hot_streams 

80 @net_hot_streams.setter 

81 def net_hot_streams(self, data): self._net_hot_streams = data 81 ↛ exitline 81 didn't return from function 'net_hot_streams' because

82 

83 @property 

84 def net_cold_streams(self): return self._net_cold_streams 

85 @net_cold_streams.setter 

86 def net_cold_streams(self, data): self._net_cold_streams = data 86 ↛ exitline 86 didn't return from function 'net_cold_streams' because

87 

88 @property 

89 def hot_utilities(self): return self._hot_utilities 

90 @hot_utilities.setter 

91 def hot_utilities(self, data): self._hot_utilities = data 91 ↛ exitline 91 didn't return from function 'hot_utilities' because

92 

93 @property 

94 def cold_utilities(self): return self._cold_utilities 

95 @cold_utilities.setter 

96 def cold_utilities(self, data): self._cold_utilities = data 96 ↛ exitline 96 didn't return from function 'cold_utilities' because

97 

98 # @property 

99 # def graphs(self): return self._graphs 

100 # @graphs.setter 

101 # def graphs(self, data): self._graphs = data  

102 

103 @property 

104 def subzones(self): return self._subzones 

105 

106 @property 

107 def targets(self): return self._targets 

108 

109 @property 

110 def process_streams(self): 

111 return self._hot_streams + self._cold_streams 

112 

113 @property 

114 def net_process_streams(self): 

115 return self._net_hot_streams + self._net_cold_streams 

116 

117 @property 

118 def utility_streams(self): 

119 return self._hot_utilities + self._cold_utilities 

120 

121 @property 

122 def all_streams(self): 

123 return self.process_streams + self.utility_streams 

124 

125 @property 

126 def all_net_streams(self): 

127 return self.net_process_streams + self.utility_streams 

128 

129 

130 # === Methods === 

131 def add_graph(self, name: str, result): 

132 self._graphs[name] = result 

133 

134 

135 def add_zone(self, zone_to_add, sub: bool =True): 

136 """Add a single zone object keyed by its name. 

137 

138 If the zone name already exists: 

139 - If the zone is identical (e.g. same stream and utility objects), skip. 

140 - If it's different, add it with a suffix like '_1', '_2', etc. 

141 """ 

142 base_name = getattr(zone_to_add, "name", None) 

143 

144 if not isinstance(base_name, str): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 raise ValueError(f"Zone must have a string 'name' attribute, got: {type(base_name).__name__}") 

146 

147 if sub: 147 ↛ 150line 147 didn't jump to line 150 because the condition on line 147 was always true

148 self._add_to_correct_zone_collection(zone_to_add, base_name, self._subzones) 

149 else: 

150 self._add_to_correct_zone_collection(zone_to_add, base_name, self._targets) 

151 

152 

153 def _add_to_correct_zone_collection(self, zone_to_add, base_name, loc): 

154 existing = loc.get(base_name) 

155 if existing: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 if self._zone_is_equal(existing, zone_to_add): 

157 return # identical, skip adding 

158 else: 

159 # Add with counter suffix until unique 

160 counter = 1 

161 new_name = f"{base_name}_{counter}" 

162 while new_name in loc: 

163 counter += 1 

164 new_name = f"{base_name}_{counter}" 

165 zone_to_add.name = new_name 

166 loc[new_name] = zone_to_add 

167 else: 

168 loc[base_name] = zone_to_add 

169 

170 

171 def add_zones(self, zones: dict, sub: bool = True): 

172 """Add multiple zones. Zones must be iterable of objects with a string .name.""" 

173 for z in zones: 

174 self.add_zone(z, sub) 

175 

176 

177 def add_target(self, target_to_add: Target): 

178 """Add one target to a specific zone.""" 

179 self._targets[target_to_add.name] = target_to_add 

180 

181 

182 def add_targets(self, targets: list): 

183 """Add multiple targets to a specific zone.""" 

184 for t in targets: 

185 self.add_target(t) 

186 

187 

188 def add_target_from_results(self, target_id: str = None, results: dict = None): 

189 target_name = f"{self.name}/{target_id}" if target_id is not None else self.name 

190 res = Target(target_name, target_id, self.parent_zone, config=self.config) 

191 for key, value in results.items(): 

192 setattr(res, key, value) 

193 self.add_target(res) 

194 

195 

196 def get_subzone(self, loc: str): 

197 loc_address = loc.split("/") 

198 try: 

199 z = self 

200 for sub in loc_address: 

201 z = z.subzones[sub] 

202 return z 

203 except: 

204 return ValueError("Subzone not found.") 

205 

206 

207 def calc_utility_cost(self): 

208 self._utility_cost = sum([u.ut_cost for u in self.utility_streams]) 

209 return self._utility_cost 

210 

211 

212 def _zone_is_equal(self, zone1: Zone, zone2: Zone): 

213 """Basic equality check between two zones. Customize as needed.""" 

214 return ( 

215 zone1._hot_streams == zone2._hot_streams and 

216 zone1._cold_streams == zone2._cold_streams and 

217 zone1._hot_utilities == zone2._hot_utilities and 

218 zone1._cold_utilities == zone2._cold_utilities 

219 ) 

220 

221 

222 def import_hot_and_cold_streams_from_sub_zones(self): 

223 """Get hot and cold streams from multiple subzones into two separate lists, maintaining references.""" 

224 z: Zone 

225 s: Stream 

226 for z in self.subzones.values(): 

227 if len(z.subzones) > 0: 

228 z.import_hot_and_cold_streams_from_sub_zones() 

229 

230 for s in z.hot_streams: 

231 key = f"{z.name}.{s.name}" 

232 self._hot_streams.add(s, key) 

233 

234 for s in z.cold_streams: 

235 key = f"{z.name}.{s.name}" 

236 self._cold_streams.add(s, key) 

237 

238 

239 def import_net_hot_and_cold_streams_from_sub_zones(self): 

240 """Get net hot and cold streams from multiple subzones into two separate lists, maintaining references.""" 

241 z: Zone 

242 s: Stream 

243 for z in self.subzones.values(): 

244 if len(z.subzones) > 0: 

245 z.import_hot_and_cold_streams_from_sub_zones() 

246 

247 for s in z.net_hot_streams: 247 ↛ 248line 247 didn't jump to line 248 because the loop on line 247 never started

248 key = f"{z.name}.{s.name}" 

249 self._net_hot_streams.add(s, key) 

250 

251 for s in z.net_cold_streams: 251 ↛ 252line 251 didn't jump to line 252 because the loop on line 251 never started

252 key = f"{z.name}.{s.name}" 

253 self._net_cold_streams.add(s, key)