Coverage for backend/pinch_service/OpenPinch/src/analysis/process_analysis.py: 96%
74 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from typing import Tuple
2from copy import deepcopy
3from ..lib import *
4from ..classes import Zone, StreamCollection, ProblemTable, Stream
5from .problem_table_analysis import problem_table_algorithm
6from .utility_targeting import get_zonal_utility_targets
7from .support_methods import get_pinch_temperatures
10__all__ = ["get_process_pinch_targets"]
12#######################################################################################################
13# Public API
14#######################################################################################################
16def get_process_pinch_targets(zone: Zone):
17 """Main function that calls def functions to calculate all process-level Pinch Targets and prepare data for Total Zone."""
18 config = zone.config
19 hot_streams, cold_streams, all_streams = zone.hot_streams, zone.cold_streams, zone.all_streams
20 hot_utilities, cold_utilities = zone.hot_utilities, zone.cold_utilities
22 pt, pt_real, target_values = problem_table_algorithm(hot_streams, cold_streams, all_streams, config)
23 pt, pt_real, hot_utilities, cold_utilities = get_zonal_utility_targets(pt, pt_real, hot_utilities, cold_utilities)
24 net_hot_streams, net_cold_streams = _get_net_hot_and_cold_segments(zone.identifier, pt, hot_utilities, cold_utilities)
25 hot_pinch, cold_pinch = get_pinch_temperatures(pt)
27 # z = get_additional_zonal_pinch_analysis(z)
28 graphs = _save_graph_data(pt, pt_real)
29 zone.add_target_from_results(TargetType.DI.value, {
30 "pt": pt,
31 "pt_real": pt_real,
32 "target_values": target_values,
33 "graphs": graphs,
34 "hot_utilities": hot_utilities,
35 "cold_utilities": cold_utilities,
36 "net_hot_streams": net_hot_streams,
37 "net_cold_streams": net_cold_streams,
38 "hot_pinch": hot_pinch,
39 "cold_pinch": cold_pinch,
40 })
42 if len(zone.subzones) > 0:
43 for z in zone.subzones.values():
44 z = get_process_pinch_targets(z)
46 return zone
48#######################################################################################################
49# Helper functions
50#######################################################################################################
52def _get_net_hot_and_cold_segments(zone_identifier: str, pt: ProblemTable, hot_utilities: StreamCollection, cold_utilities: StreamCollection) -> Tuple[StreamCollection, StreamCollection]:
53 total_utility_demand = sum([u.heat_flow for u in hot_utilities]) + sum([u.heat_flow for u in cold_utilities])
54 if zone_identifier == ZoneType.P.value and total_utility_demand > ZERO:
55 net_hot_streams, net_cold_streams = _save_data_for_total_site_analysis(pt, PT.T.value, PT.H_NET_A.value, hot_utilities, cold_utilities)
56 else:
57 net_hot_streams, net_cold_streams = StreamCollection(), StreamCollection()
58 return net_hot_streams, net_cold_streams
61def _save_data_for_total_site_analysis(pt: ProblemTable, col_T: str, col_H: str, hot_utilities: StreamCollection, cold_utilities: StreamCollection) -> Tuple[StreamCollection, StreamCollection]:
62 """Constructs net stream segments that require utility input across temperature intervals."""
63 net_hot_streams = StreamCollection()
64 net_cold_streams = StreamCollection()
65 #Stephen changed the value from the ZERO constant to a very small number to deal with some janky behaviour
66 if pt.delta_col(col_T)[1:].min() < 0.00000000000000000000000000000000000001: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 raise ValueError("Infeasible temperature interval detected in _store_TSP_data")
69 T_vals = pt.col[col_T]
70 dh_vals = pt.delta_col(col_H)[1:]
72 hot_utilities = deepcopy(hot_utilities)
73 cold_utilities = deepcopy(cold_utilities)
75 hu: Stream = _initialise_utility_selected(hot_utilities)
76 cu: Stream = _initialise_utility_selected(cold_utilities)
78 k = 1
79 for i, dh in enumerate(dh_vals):
80 if dh > ZERO:
81 hu, hot_utilities, net_cold_streams, k = _add_net_segment(T_vals[i], T_vals[i+1], hu, dh, hot_utilities, net_cold_streams, k)
82 elif -dh > ZERO:
83 cu, cold_utilities, net_hot_streams, k = _add_net_segment(T_vals[i], T_vals[i+1], cu, dh, cold_utilities, net_hot_streams, k)
85 return net_hot_streams, net_cold_streams
88def _add_net_segment(T_ub: float, T_lb: float, curr_u: Stream, dh_req: float, utilities: StreamCollection, net_streams: StreamCollection, k: int, j: int = 0):
89 """Adds a net utility segment and recursively handles segmentation if needed."""
90 next_u = _advance_utility_if_needed(dh_req, curr_u, utilities)
92 dh_next = max(-curr_u.heat_flow, 0.0)
93 curr_u.heat_flow = max(curr_u.heat_flow, 0.0)
94 dh_curr = abs(dh_req) - dh_next
95 T_i = T_ub - (dh_curr / abs(dh_req)) * (T_ub - T_lb)
97 net_streams.add(
98 Stream(
99 name=f"Segment {k}" if j == 0 else f"Segment {k}-{j}",
100 t_supply=T_i if curr_u.type == StreamType.Hot.value else T_ub,
101 t_target=T_ub if curr_u.type == StreamType.Hot.value else T_i,
102 heat_flow=dh_curr,
103 dt_cont=curr_u.dt_cont,
104 htc=1.0, # Might be a way to calculate this.
105 is_process_stream=True
106 )
107 )
108 if dh_next > ZERO: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 return _add_net_segment(T_i, T_lb, next_u, dh_next, utilities, net_streams, k, j+1)
110 else:
111 return next_u, utilities, net_streams, k+1
114def _initialise_utility_selected(utilities: StreamCollection = None):
115 """Returns the first available utility with remaining capacity."""
116 for j in range(len(utilities)):
117 if utilities[j].heat_flow > ZERO:
118 return utilities[j]
119 return utilities[-1]
122def _advance_utility_if_needed(dh_used: float, current_u: Stream = None, utilities: StreamCollection = None) -> Stream:
123 """Advances to the next utility if the current one is exhausted."""
124 current_u.heat_flow -= abs(dh_used)
125 if current_u.heat_flow > ZERO:
126 return current_u
128 k = utilities.get_index(current_u) + 1
129 for j in range(k, len(utilities)):
130 if utilities[j].heat_flow > ZERO:
131 return utilities[j]
133 return utilities[-1]
136def _save_graph_data(pt: ProblemTable, pt_real: ProblemTable) -> Zone:
137 pt.round(decimals=4)
138 pt_real.round(decimals=4)
139 return {
140 GT.CC.value: pt_real[[PT.T.value, PT.H_HOT.value, PT.H_COLD.value]],
141 GT.SCC.value: pt[[PT.T.value, PT.H_HOT.value, PT.H_COLD.value]],
142 GT.GCC.value: pt[[PT.T.value, PT.H_NET.value]],
143 GT.GCC_NP.value: pt[[PT.T.value, PT.H_NET_NP.value]],
144 GT.GCC_Ex.value: pt[[PT.T.value, PT.H_NET_V.value]],
145 GT.GCC_Act.value: pt[[PT.T.value, PT.H_NET_A.value, PT.H_UT_NET.value]],
146 GT.SHL.value: pt[[PT.T.value, PT.H_HOT_NET.value, PT.H_COLD_NET.value]],
147 GT.GCC_Ut.value: pt_real[[PT.T.value, PT.H_UT_NET.value]],
148 GT.GCC_Ut_star.value: pt[[PT.T.value, PT.H_UT_NET.value]],
149 }