Coverage for backend/pinch_service/OpenPinch/src/analysis/process_analysis.py: 96%

74 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from typing import Tuple 

2from copy import deepcopy 

3from ..lib import * 

4from ..classes import Zone, StreamCollection, ProblemTable, Stream 

5from .problem_table_analysis import problem_table_algorithm 

6from .utility_targeting import get_zonal_utility_targets 

7from .support_methods import get_pinch_temperatures 

8 

9 

10__all__ = ["get_process_pinch_targets"] 

11 

12####################################################################################################### 

13# Public API 

14####################################################################################################### 

15 

16def get_process_pinch_targets(zone: Zone): 

17 """Main function that calls def functions to calculate all process-level Pinch Targets and prepare data for Total Zone.""" 

18 config = zone.config 

19 hot_streams, cold_streams, all_streams = zone.hot_streams, zone.cold_streams, zone.all_streams 

20 hot_utilities, cold_utilities = zone.hot_utilities, zone.cold_utilities 

21 

22 pt, pt_real, target_values = problem_table_algorithm(hot_streams, cold_streams, all_streams, config) 

23 pt, pt_real, hot_utilities, cold_utilities = get_zonal_utility_targets(pt, pt_real, hot_utilities, cold_utilities) 

24 net_hot_streams, net_cold_streams = _get_net_hot_and_cold_segments(zone.identifier, pt, hot_utilities, cold_utilities) 

25 hot_pinch, cold_pinch = get_pinch_temperatures(pt) 

26 

27 # z = get_additional_zonal_pinch_analysis(z) 

28 graphs = _save_graph_data(pt, pt_real) 

29 zone.add_target_from_results(TargetType.DI.value, { 

30 "pt": pt, 

31 "pt_real": pt_real, 

32 "target_values": target_values, 

33 "graphs": graphs, 

34 "hot_utilities": hot_utilities, 

35 "cold_utilities": cold_utilities, 

36 "net_hot_streams": net_hot_streams, 

37 "net_cold_streams": net_cold_streams, 

38 "hot_pinch": hot_pinch, 

39 "cold_pinch": cold_pinch, 

40 }) 

41 

42 if len(zone.subzones) > 0: 

43 for z in zone.subzones.values(): 

44 z = get_process_pinch_targets(z) 

45 

46 return zone 

47 

48####################################################################################################### 

49# Helper functions 

50####################################################################################################### 

51 

52def _get_net_hot_and_cold_segments(zone_identifier: str, pt: ProblemTable, hot_utilities: StreamCollection, cold_utilities: StreamCollection) -> Tuple[StreamCollection, StreamCollection]: 

53 total_utility_demand = sum([u.heat_flow for u in hot_utilities]) + sum([u.heat_flow for u in cold_utilities]) 

54 if zone_identifier == ZoneType.P.value and total_utility_demand > ZERO: 

55 net_hot_streams, net_cold_streams = _save_data_for_total_site_analysis(pt, PT.T.value, PT.H_NET_A.value, hot_utilities, cold_utilities) 

56 else: 

57 net_hot_streams, net_cold_streams = StreamCollection(), StreamCollection() 

58 return net_hot_streams, net_cold_streams 

59 

60 

61def _save_data_for_total_site_analysis(pt: ProblemTable, col_T: str, col_H: str, hot_utilities: StreamCollection, cold_utilities: StreamCollection) -> Tuple[StreamCollection, StreamCollection]: 

62 """Constructs net stream segments that require utility input across temperature intervals.""" 

63 net_hot_streams = StreamCollection() 

64 net_cold_streams = StreamCollection() 

65 #Stephen changed the value from the ZERO constant to a very small number to deal with some janky behaviour 

66 if pt.delta_col(col_T)[1:].min() < 0.00000000000000000000000000000000000001: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 raise ValueError("Infeasible temperature interval detected in _store_TSP_data") 

68 

69 T_vals = pt.col[col_T] 

70 dh_vals = pt.delta_col(col_H)[1:] 

71 

72 hot_utilities = deepcopy(hot_utilities) 

73 cold_utilities = deepcopy(cold_utilities) 

74 

75 hu: Stream = _initialise_utility_selected(hot_utilities) 

76 cu: Stream = _initialise_utility_selected(cold_utilities) 

77 

78 k = 1 

79 for i, dh in enumerate(dh_vals): 

80 if dh > ZERO: 

81 hu, hot_utilities, net_cold_streams, k = _add_net_segment(T_vals[i], T_vals[i+1], hu, dh, hot_utilities, net_cold_streams, k) 

82 elif -dh > ZERO: 

83 cu, cold_utilities, net_hot_streams, k = _add_net_segment(T_vals[i], T_vals[i+1], cu, dh, cold_utilities, net_hot_streams, k) 

84 

85 return net_hot_streams, net_cold_streams 

86 

87 

88def _add_net_segment(T_ub: float, T_lb: float, curr_u: Stream, dh_req: float, utilities: StreamCollection, net_streams: StreamCollection, k: int, j: int = 0): 

89 """Adds a net utility segment and recursively handles segmentation if needed.""" 

90 next_u = _advance_utility_if_needed(dh_req, curr_u, utilities) 

91 

92 dh_next = max(-curr_u.heat_flow, 0.0) 

93 curr_u.heat_flow = max(curr_u.heat_flow, 0.0) 

94 dh_curr = abs(dh_req) - dh_next 

95 T_i = T_ub - (dh_curr / abs(dh_req)) * (T_ub - T_lb) 

96 

97 net_streams.add( 

98 Stream( 

99 name=f"Segment {k}" if j == 0 else f"Segment {k}-{j}", 

100 t_supply=T_i if curr_u.type == StreamType.Hot.value else T_ub, 

101 t_target=T_ub if curr_u.type == StreamType.Hot.value else T_i, 

102 heat_flow=dh_curr, 

103 dt_cont=curr_u.dt_cont, 

104 htc=1.0, # Might be a way to calculate this. 

105 is_process_stream=True 

106 ) 

107 ) 

108 if dh_next > ZERO: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 return _add_net_segment(T_i, T_lb, next_u, dh_next, utilities, net_streams, k, j+1) 

110 else: 

111 return next_u, utilities, net_streams, k+1 

112 

113 

114def _initialise_utility_selected(utilities: StreamCollection = None): 

115 """Returns the first available utility with remaining capacity.""" 

116 for j in range(len(utilities)): 

117 if utilities[j].heat_flow > ZERO: 

118 return utilities[j] 

119 return utilities[-1] 

120 

121 

122def _advance_utility_if_needed(dh_used: float, current_u: Stream = None, utilities: StreamCollection = None) -> Stream: 

123 """Advances to the next utility if the current one is exhausted.""" 

124 current_u.heat_flow -= abs(dh_used) 

125 if current_u.heat_flow > ZERO: 

126 return current_u 

127 

128 k = utilities.get_index(current_u) + 1 

129 for j in range(k, len(utilities)): 

130 if utilities[j].heat_flow > ZERO: 

131 return utilities[j] 

132 

133 return utilities[-1] 

134 

135 

136def _save_graph_data(pt: ProblemTable, pt_real: ProblemTable) -> Zone: 

137 pt.round(decimals=4) 

138 pt_real.round(decimals=4) 

139 return { 

140 GT.CC.value: pt_real[[PT.T.value, PT.H_HOT.value, PT.H_COLD.value]], 

141 GT.SCC.value: pt[[PT.T.value, PT.H_HOT.value, PT.H_COLD.value]], 

142 GT.GCC.value: pt[[PT.T.value, PT.H_NET.value]], 

143 GT.GCC_NP.value: pt[[PT.T.value, PT.H_NET_NP.value]], 

144 GT.GCC_Ex.value: pt[[PT.T.value, PT.H_NET_V.value]], 

145 GT.GCC_Act.value: pt[[PT.T.value, PT.H_NET_A.value, PT.H_UT_NET.value]], 

146 GT.SHL.value: pt[[PT.T.value, PT.H_HOT_NET.value, PT.H_COLD_NET.value]], 

147 GT.GCC_Ut.value: pt_real[[PT.T.value, PT.H_UT_NET.value]], 

148 GT.GCC_Ut_star.value: pt[[PT.T.value, PT.H_UT_NET.value]], 

149 }