Coverage for backend/pinch_service/OpenPinch/src/analysis/data_preparation.py: 86%

209 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import copy 

2from typing import List, Tuple 

3from ..lib import * 

4from .support_methods import get_value 

5from ..classes import Zone, Stream, StreamCollection 

6 

7 

8__all__ = ["prepare_problem_struture"] 

9 

10####################################################################################################### 

11# Public API 

12####################################################################################################### 

13 

14def prepare_problem_struture(streams: List[StreamSchema] = [], utilities: List[UtilitySchema] = [], options: Configuration = None, project_name: str = "Site", zone_tree: ZoneTreeSchema = None): 

15 """Prepares an industrial site for pinch analysis by parsing input stream and utility data. 

16 

17 This function validates and processes the input stream and utility data, assigning them  

18 to the appropriate zones and generating any required default utilities. It augments  

19 the `Zone` object with hot/cold utilities and process zones suitable for further analysis. 

20 

21 Args: 

22 zone (Zone): An industrial zone object containing configuration settings. 

23 streams (List[StreamSchema]): A list of stream inputs, each describing process stream characteristics. 

24 utilities (List[UtilitySchema]): A list of utility inputs, including hot and cold utilities. 

25 

26 Returns: 

27 Zone: The updated zone object with streams and utilities assigned to appropriate zones. 

28 """ 

29 top_zone_name, top_zone_identifier = _get_validated_zone_info(zone_tree, project_name) 

30 config = Configuration( 

31 options=options, 

32 top_zone_name=top_zone_name, 

33 top_zone_identifier=top_zone_identifier, 

34 ) 

35 zone_tree, streams, utilities, config = _validate_input_data(zone_tree, streams, utilities, config) 

36 master_zone = Zone(name=config.TOP_ZONE_NAME, identifier=config.TOP_ZONE_IDENTIFIER, config=config) 

37 master_zone = _create_nested_zones(master_zone, zone_tree, master_zone.config) 

38 master_zone = _get_process_streams_in_each_subzone(master_zone, sorted(streams, key=lambda x: x.name)) 

39 master_zone.import_hot_and_cold_streams_from_sub_zones() 

40 hot_utilities, cold_utilities = _get_hot_and_cold_utilities(utilities, master_zone.hot_streams, master_zone.cold_streams, master_zone.config) 

41 master_zone = _set_utilities_for_zone_and_subzones(master_zone, hot_utilities, cold_utilities) 

42 return master_zone 

43 

44####################################################################################################### 

45# Helper Functions 

46####################################################################################################### 

47 

48def _get_validated_zone_info(zone_tree: ZoneTreeSchema, project_name: str = None) -> Tuple[str, str]: 

49 """Get from input data (zone_tree) the identifier/type for the top level zone.""" 

50 if isinstance(zone_tree, ZoneTreeSchema): 

51 if zone_tree.type in ["Zone", "Sub-Zone", "Process Zone"]: 51 ↛ 53line 51 didn't jump to line 53 because the condition on line 51 was always true

52 zone_type = ZoneType.P.value 

53 elif zone_tree.type == "Site": 

54 zone_type = ZoneType.S.value 

55 elif zone_tree.type == "Community": 

56 zone_type = ZoneType.C.value 

57 elif zone_tree.type == "Region": 

58 zone_type = ZoneType.R.value 

59 elif zone_tree.type == "Utility Zone": 

60 zone_type = ZoneType.U.value 

61 else: 

62 raise ValueError("Zone name and type could not be identified correctly.") 

63 zone_name = zone_tree.name 

64 else: 

65 zone_type = ZoneType.S.value 

66 zone_name = project_name 

67 return zone_name, zone_type 

68 

69 

70def _validate_input_data(zone_tree: ZoneTreeSchema = None, streams: List[StreamSchema] = [], utilities: List[UtilitySchema] = [], config: Configuration = Configuration()): 

71 """Checks for logic and completeness of the input data. Where possible, fills in the gaps with general assumptions.""" 

72 streams = _validate_streams_passed_in(streams) 

73 utilities = _validate_utilities_passed_in(utilities) 

74 config = _validate_config_data_completed(config) 

75 zone_tree = _validate_zone_tree_structure(zone_tree, streams, config.TOP_ZONE_NAME) 

76 return zone_tree, streams, utilities, config 

77 

78 

79def _create_nested_zones(parent_zone: Zone, zone_tree: ZoneTreeSchema, config: Configuration) -> Zone: 

80 """Recursively construct a Zone hierarchy from a ZoneTreeSchema.""" 

81 if not zone_tree.children: 

82 return parent_zone 

83 

84 for child_schema in zone_tree.children: 

85 child_zone = Zone( 

86 name=child_schema.name, 

87 identifier=child_schema.type, 

88 config=config, 

89 parent_zone=parent_zone, 

90 ) 

91 parent_zone.add_zone(child_zone, sub=True) 

92 _create_nested_zones(child_zone, child_schema, config) 

93 

94 return parent_zone 

95 

96 

97def _get_process_streams_in_each_subzone(master_zone: Zone, streams: List[StreamSchema]) -> Zone: 

98 """Extracts all stream data into class instances, creates the required subzones and adds these to the parent zone.""" 

99 

100 def _flatten_zone_hierarchy(parent_zone: Zone) -> List[Zone]: 

101 """Recursively flattens the zone tree starting from parent_zone into a list of all Zone objects.""" 

102 zones = [parent_zone] 

103 for subzone in parent_zone.subzones.values(): 

104 zones.extend(_flatten_zone_hierarchy(subzone)) 

105 return zones 

106 

107 flat_zones = _flatten_zone_hierarchy(master_zone) 

108 for z in flat_zones: 

109 _add_process_streams_under_zones(z, streams) 

110 return master_zone 

111 

112 

113def _create_process_stream(stream: StreamSchema) -> Stream: 

114 """Creates a Stream instance from StreamSchema.""" 

115 # Create and initialise stream 

116 return Stream( 

117 name=stream.name, 

118 t_supply = get_value(stream.t_supply), 

119 t_target = get_value(stream.t_target), 

120 heat_flow = get_value(stream.heat_flow), 

121 dt_cont = get_value(stream.dt_cont), 

122 htc = get_value(stream.htc), 

123 is_process_stream = True, 

124 ) 

125 

126 

127def _add_process_streams_under_zones(z: Zone, streams: List[StreamSchema]) -> Zone: 

128 """Adds hot and cold streams to the given zone.""" 

129 stream_j: Stream 

130 

131 def _get_zone_path_from_child(child_zone: Zone, delimiter="/") -> str: 

132 """Constructs the zone path from a child Zone back to the master zone using parent_zone links.""" 

133 path_parts = [] 

134 current = child_zone 

135 while current is not None: 

136 path_parts.append(current.name) 

137 current = current.parent_zone 

138 return delimiter.join(reversed(path_parts)) 

139 

140 zone_path = _get_zone_path_from_child(z) 

141 

142 for s in streams: 

143 stream_zone_name = s.zone.split("/")[-1] 

144 if stream_zone_name == z.name or s.zone == z.name or s.zone == zone_path: #or z.name == TargetType.DI.value 

145 # Create Stream from Data 

146 stream_j = _create_process_stream(s) 

147 if stream_j.type==StreamType.Hot.value: 

148 key = ".".join([s.zone, StreamLoc.HotStr.value, s.name]) 

149 z.hot_streams.add(stream_j, key) 

150 else: 

151 key = ".".join([s.zone, StreamLoc.ColdStr.value, s.name]) 

152 z.cold_streams.add(stream_j) 

153 return z 

154 

155 

156def _get_hot_and_cold_utilities(utilities: List[UtilitySchema], hot_streams: List[Stream], cold_streams: List[Stream], config: Configuration) -> Tuple[List[Stream], List[Stream]]: 

157 """Extracts all utility data into class instances.""" 

158 HU_T_min, CU_T_max = _find_extreme_process_temperatures(hot_streams, cold_streams) 

159 utilities, addDefaultHU, addDefaultCU = _complete_utility_data(utilities, config, HU_T_min, CU_T_max) 

160 utilities = _add_default_utilities(utilities, config, addDefaultHU, addDefaultCU, HU_T_min, CU_T_max) 

161 hot_utilities, utilities = _create_utilities_list(utilities, utility_type=StreamType.Hot.value) 

162 cold_utilities, utilities = _create_utilities_list(utilities, utility_type=StreamType.Cold.value) 

163 return hot_utilities, cold_utilities 

164 

165 

166def _find_extreme_process_temperatures(hot_streams: List[Stream], cold_streams: List[Stream]) -> Tuple[float, float]: 

167 """Find highest TT of a cold stream and lowest TT of a hot stream.""" 

168 HU_T_min: float = -1e9 

169 CU_T_max: float = 1e9 

170 s: Stream 

171 for s in hot_streams: 

172 if CU_T_max > s.t_min_star: 

173 CU_T_max = s.t_min_star 

174 for s in cold_streams: 

175 if HU_T_min < s.t_max_star: 

176 HU_T_min = s.t_max_star 

177 return HU_T_min, CU_T_max 

178 

179 

180def _complete_utility_data(utilities: List[UtilitySchema], config: Configuration, HU_T_min: float, CU_T_max: float) -> Tuple[List[UtilitySchema], bool, bool]: 

181 """Completes the utility data with default values and adds default utilities if needed.""" 

182 utility: UtilitySchema 

183 

184 # Fill in any missing data 

185 addDefaultHU = True 

186 addDefaultCU = True 

187 

188 # Set Defaults 

189 for utility in utilities: 

190 utility.t_supply = get_value(utility.t_supply) 

191 if get_value(utility.t_target) == None or get_value(utility.t_target) == get_value(utility.t_supply): 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 utility.t_target = utility.t_supply - config.DTGLIDE if utility.type == "Hot" else utility.t_supply + config.DTGLIDE 

193 else: 

194 utility.t_target = get_value(utility.t_target) 

195 if get_value(utility.dt_cont) == None: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 utility.dt_cont = config.DTCONT 

197 else: 

198 utility.dt_cont = get_value(utility.dt_cont) 

199 if get_value(utility.price) == None: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 utility.price = config.UTILITY_PRICE * config.ANNUAL_OP_TIME 

201 else: 

202 utility.price = get_value(utility.price) 

203 if get_value(utility.htc) == None or get_value(utility.htc) == 0: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 utility.htc = config.HTC 

205 else: 

206 utility.htc = get_value(utility.htc) 

207 if (utility.type in ["Hot", "Both"] and utility.active and min(utility.t_supply, utility.t_target) - utility.dt_cont >= HU_T_min): 

208 addDefaultHU = False 

209 if (utility.type in ["Cold", "Both"] and utility.active and max(utility.t_supply, utility.t_target) - utility.dt_cont <= CU_T_max): 

210 addDefaultCU = False 

211 return utilities, addDefaultHU, addDefaultCU 

212 

213 

214def _add_default_utilities(utilities: List[UtilitySchema], config: Configuration, addDefaultHU: bool, addDefaultCU: bool, HU_T_min: float, CU_T_max: float) -> List[UtilitySchema]: 

215 """Adds default hot and cold utilities to the list of utilities.""" 

216 # Add default hot and cold utilities 

217 if addDefaultHU: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 utilities.append( 

219 _create_default_utility("HU", "Hot", HU_T_min, config) 

220 ) 

221 if addDefaultCU: 

222 utilities.append( 

223 _create_default_utility("CU", "Cold", CU_T_max, config) 

224 ) 

225 return utilities 

226 

227 

228def _create_default_utility(name: str, ut_type: str, T: float, config: Configuration) -> UtilitySchema: 

229 a = 1 if ut_type == "Hot" else -1 

230 return UtilitySchema.model_validate( 

231 { 

232 "name": name, 

233 "type": ut_type, 

234 "t_supply": T + (config.DTCONT) * a, 

235 "t_target": T + (config.DTCONT - config.DTGLIDE) * a, 

236 "heat_flow": 0, 

237 "dt_cont": config.DTCONT, 

238 "price": config.UTILITY_PRICE, 

239 "htc": config.HTC, 

240 } 

241 ) 

242 

243 

244def _create_utilities_list(utilities: List[UtilitySchema], utility_type: str) -> Tuple[List[Stream], List[UtilitySchema]]: 

245 """Creates a sorted list of hot or cold Stream objects based on type.""" 

246 created_utilities = StreamCollection() 

247 T_prev = 1e9 

248 

249 # Find the first utility of the specified type 

250 for selected in utilities: 250 ↛ 254line 250 didn't jump to line 254 because the loop on line 250 didn't complete

251 if selected.type in ["Both", utility_type] and selected.active: 

252 break 

253 

254 prev_selected_name = "" 

255 

256 for _ in range(len(utilities)): 256 ↛ 299line 256 didn't jump to line 299 because the loop on line 256 didn't complete

257 #Cycle through all utilities as candidates, comparing each candidate agaist the previous utility selected and the best found (U) 

258 for candidate in utilities: 

259 is_valid = ( 

260 candidate.type in ["Both", utility_type] 

261 and candidate.t_supply < T_prev 

262 and (candidate.t_supply >= selected.t_supply or selected.name == prev_selected_name) 

263 and candidate.active 

264 ) 

265 if is_valid: 

266 selected = candidate 

267 

268 # If no different utility is identified, break 

269 if selected.name == prev_selected_name: 

270 break 

271 

272 T_prev = selected.t_supply 

273 if selected.type in [utility_type]: 

274 selected.active = False 

275 

276 if utility_type == "Hot": 

277 t_supply = max(selected.t_supply, selected.t_target) 

278 t_target = min(selected.t_supply, selected.t_target) 

279 else: 

280 t_supply = min(selected.t_supply, selected.t_target) 

281 t_target = max(selected.t_supply, selected.t_target) 

282 

283 # Create utility 

284 key = ".".join([StreamLoc.HotU.value, selected.name]) if utility_type == StreamType.Hot.value else ".".join([StreamLoc.ColdU.value, selected.name]) 

285 created_utilities.add( 

286 Stream( 

287 selected.name, 

288 t_supply, 

289 t_target, 

290 selected.dt_cont, 

291 htc=selected.htc, 

292 price=selected.price, 

293 is_process_stream=False, 

294 ), 

295 key 

296 ) 

297 prev_selected_name = selected.name 

298 

299 return created_utilities, utilities 

300 

301 

302def _set_utilities_for_zone_and_subzones(zone: Zone, hot_utilities: List[Stream], cold_utilities: List[Stream]) -> Zone: 

303 """Adds hot and cold utilities to the zone and each subzone under zone.""" 

304 zone.hot_utilities.add_many(copy.deepcopy(hot_utilities)) 

305 zone.cold_utilities.add_many(copy.deepcopy(cold_utilities)) 

306 for subzone in zone.subzones.values(): 

307 subzone = _set_utilities_for_zone_and_subzones(subzone, hot_utilities, cold_utilities) 

308 return zone 

309 

310 

311def _validate_zone_tree_structure(zone_tree: ZoneTreeSchema = None, streams: List[StreamSchema] = [], top_zone_name: str = None) -> ZoneTreeSchema: 

312 if isinstance(zone_tree, ZoneTreeSchema): 

313 if zone_tree.type == ZoneType.U.value: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 raise ValueError("Pinch analysis does not apply to Utility Zones.") 

315 

316 def _check_zone_tree(parent_schema: ZoneTreeSchema) -> ZoneTreeSchema: 

317 """Recursively construct a Zone hierarchy from a ZoneTreeSchema.""" 

318 zone_name, zone_type = _get_validated_zone_info(parent_schema) 

319 parent_schema.name = zone_name 

320 parent_schema.type = zone_type 

321 

322 if not parent_schema.children: 

323 return parent_schema 

324 

325 for child_schema in parent_schema.children: 

326 child_schema = _check_zone_tree(child_schema) 

327 

328 return parent_schema 

329 

330 zone_tree = _check_zone_tree(zone_tree) 

331 return zone_tree 

332 

333 # Build zone tree from stream zone names 

334 if not isinstance(top_zone_name, str): 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 top_zone_name = ZoneType.S.value 

336 

337 root = {"name": top_zone_name, "type": ZoneType.S.value, "children": {}} 

338 zone_names = sorted(set(stream.zone for stream in streams if stream.zone)) # Filter empty/null zones 

339 

340 def _split_zone_name(name: str): 

341 if "/" in name: 

342 return [z.strip() for z in name.split("/") if z.strip()] 

343 return [name] 

344 

345 for zone_name in zone_names: 

346 z_path = _split_zone_name(zone_name) 

347 current = root 

348 for i, z_name in enumerate(z_path): 

349 if z_name not in current["children"]: 

350 current["children"][z_name] = { 

351 "name": z_name, 

352 "type": ZoneType.P.value, 

353 "children": {} 

354 } 

355 current = current["children"][z_name] 

356 

357 def _build_tree(node_dict): 

358 children = [ 

359 _build_tree(child) for child in node_dict["children"].values() 

360 ] 

361 return ZoneTreeSchema( 

362 name=node_dict["name"], 

363 type=node_dict["type"], 

364 children=children if children else None 

365 ) 

366 

367 return ZoneTreeSchema.model_validate( 

368 _build_tree(root) 

369 ) 

370 

371 

372def _validate_streams_passed_in(streams: List[StreamSchema]) -> list: 

373 """Raises an error if no streams are passed in.""" 

374 if len(streams) == 0: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 raise ValueError("At least one stream is required") 

376 return streams 

377 

378 

379def _validate_utilities_passed_in(utilities: List[UtilitySchema]) -> list: 

380 """Check if any utilities are passed in""" 

381 return [] if utilities is None else utilities 

382 

383 

384def _validate_config_data_completed(config: Configuration) -> Configuration: 

385 """Validates that the configuration settings make logical sense.""" 

386 # Check if annual operation time is set  

387 if isinstance(config.ANNUAL_OP_TIME, float | int) == False or config.ANNUAL_OP_TIME == 0: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 config.ANNUAL_OP_TIME = 365 * 24 #h/y 

389 # Ensures the inlet pressure to the turbine is below the critical pressure  

390 # TODO: Add units to the turbine pressure  

391 if config.TURBINE_WORK_BUTTON is True and config.P_TURBINE_BOX > 220: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 config.P_TURBINE_BOX = 200 

393 if config.DTGLIDE <= 0: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 config.DTGLIDE = 0.01 

395 if config.DTCONT < 0: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true

396 config.DTCONT = 0.0 

397 return config