Coverage for backend/pinch_service/OpenPinch/src/analysis/graphs.py: 77%
148 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from typing import List, Tuple
2from ..lib.enums import *
3from ..utils import *
4from .support_methods import *
5from ..classes import *
7DECIMAL_PLACES = 2
9__all__ = ["get_output_graphs, visualise_graphs"]
12#######################################################################################################
13# Public API
14#######################################################################################################
16def get_output_graphs(zone: Zone, graph_sets: Dict = {}) -> Dict:
17 """Returns Json data points for each process."""
18 for key, t in zone.targets.items():
19 graph_sets[key] = _create_graph_set(t, key)
21 if len(zone.subzones) > 0:
22 for z in zone.subzones.values():
23 graph_sets = get_output_graphs(z, graph_sets)
25 return graph_sets
28def visualise_graphs(graph_set: dict, graph) -> None:
29 """Adds a graph to the graph_set based on its type."""
30 graph_data = graph.data
31 graph_type = graph.type
33 match graph_type:
34 case GT.CC.value | GT.SCC.value:
35 curves = [
36 _graph_cc(StreamType.Hot.value, graph_data[PT.T.value].to_list(), graph_data[PT.H_HOT.value].to_list()),
37 _graph_cc(StreamType.Cold.value, graph_data[PT.T.value].to_list(), graph_data[PT.H_COLD.value].to_list())
38 ]
39 graph_set['graphs'].append({
40 'type': graph_type,
41 'name': f'{graph_type} Graph',
42 'segments': curves[0] + curves[1]
43 })
45 case GT.BCC.value:
46 curves = [
47 _graph_cc(StreamType.Hot.value, graph_data[PT.T.value].to_list(), graph_data[PT.H_HOT.value].to_list(), IncludeArrows=False),
48 _graph_cc(StreamType.Cold.value, graph_data[PT.T.value].to_list(), graph_data[PT.H_COLD.value].to_list(), IncludeArrows=False)
49 ]
50 graph_set['graphs'].append({
51 'type': graph_type,
52 'name': f'{graph_type} Graph',
53 'segments': curves[0] + curves[1]
54 })
56 case GT.GCC.value | GT.GCC_NP.value | GT.GCCU.value:
57 segments = _graph_gcc(graph_data[PT.H_NET.value].to_list(), graph_data[PT.T.value].to_list())
58 graph_set['graphs'].append({
59 'type': graph_type,
60 'name': f'{graph_type} Graph',
61 'segments': segments
62 })
63 if graph_type == GT.GCCU.value:
64 # Add second set with utility profile style
65 segments_ut = _graph_gcc(graph_data[PT.H_NET.value].to_list(), graph_data[PT.T.value].to_list(), utility_profile=True)
66 graph_set['graphs'].append({
67 'type': f'{graph_type}_Utility',
68 'name': f'{graph_type} Utility Graph',
69 'segments': segments_ut
70 })
73#######################################################################################################
74# Helper Functions
75#######################################################################################################
77def _create_graph_set(t: Target, graphTitle: str) -> dict:
78 """Creates pinch analysis and total site analysis graphs for a specifc zone."""
80 graph_set = {
81 'name': graphTitle,
82 'graphs': []
83 }
85 # === Composite Curve ===
86 if GT.CC.value in t.graphs:
87 key = GT.CC.value
88 g = {
89 'type': key,
90 'name': f'Composite Curve: {graphTitle}',
91 'segments':
92 _graph_cc(
93 StreamType.Hot.value,
94 t.graphs[key][PT.T.value].to_list(),
95 t.graphs[key][PT.H_HOT.value].to_list(),
96 ) +
97 _graph_cc(
98 StreamType.Cold.value,
99 t.graphs[key][PT.T.value].to_list(),
100 t.graphs[key][PT.H_COLD.value].to_list(),
101 ),
102 }
103 graph_set['graphs'].append(g)
105 # === Shifted Composite Curve ===
106 if GT.SCC.value in t.graphs:
107 key = GT.SCC.value
108 g = {
109 'type': key,
110 'name': f'Shifted Composite Curve: {graphTitle}',
111 'segments':
112 _graph_cc(
113 StreamType.Hot.value,
114 t.graphs[key][PT.T.value].to_list(),
115 t.graphs[key][PT.H_HOT.value].to_list(),
116 ) +
117 _graph_cc(
118 StreamType.Cold.value,
119 t.graphs[key][PT.T.value].to_list(),
120 t.graphs[key][PT.H_COLD.value].to_list(),
121 ),
122 }
123 graph_set['graphs'].append(g)
125 # === Balanced Composite Curve ===
126 if GT.BCC.value in t.graphs: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 key = GT.BCC.value
128 g = {
129 'type': key,
130 'name': f'Balanced Composite Curve: {graphTitle}',
131 'segments':
132 _graph_cc(
133 StreamType.Hot.value,
134 t.graphs[key][PT.T.value].to_list(),
135 t.graphs[key][PT.H_HOT.value].to_list(),
136 ) +
137 _graph_cc(
138 StreamType.Cold.value,
139 t.graphs[key][PT.T.value].to_list(),
140 t.graphs[key][PT.H_COLD.value].to_list(),
141 ),
142 }
143 graph_set['graphs'].append(g)
145 # === Grand Composite Curve (GCC) ===
146 if GT.GCC.value in t.graphs:
147 key = GT.GCC.value
148 g = {
149 'type': key,
150 'name': f'Grand Composite Curve: {graphTitle}',
151 'segments':
152 _graph_gcc(
153 t.graphs[key][PT.T.value].to_list(),
154 t.graphs[key][PT.H_NET.value].to_list(),
155 ),
156 }
157 graph_set['graphs'].append(g)
159 # === Grand Composite Curve with no pockets (GCC_Act and GCC_Ut_star) ===
160 if GT.GCC_Act.value in t.graphs:
161 key = GT.GCC_Act.value
162 g = {
163 'type': key,
164 'name': f'Grand Composite Curve: {graphTitle}',
165 'segments':
166 _graph_gcc(
167 t.graphs[key][PT.T.value].to_list(),
168 t.graphs[key][PT.H_NET_A.value].to_list(),
169 ) +
170 _graph_gcc(
171 t.graphs[key][PT.T.value].to_list(),
172 t.graphs[key][PT.H_UT_NET.value].to_list(),
173 ),
174 }
175 graph_set['graphs'].append(g)
177 # === Grand Composite Curve with vertical CC heat transfer (GCC_Ex and GCC_Ut_star) ===
178 if GT.GCC_Ex.value in t.graphs:
179 key = GT.GCC_Ex.value
180 g = {
181 'type': key,
182 'name': f'Grand Composite Curve: {graphTitle}',
183 'segments':
184 _graph_gcc(
185 t.graphs[key][PT.T.value].to_list(),
186 t.graphs[key][PT.H_NET_V.value].to_list(),
187 ),
188 }
189 graph_set['graphs'].append(g)
191 # === Total Site Profiles ===
192 if GT.TSP.value in t.graphs:
193 key = GT.TSP.value
194 g = {
195 'type': key,
196 'name': f'Total Site Profiles: {graphTitle}',
197 'segments':
198 _graph_cc(
199 StreamType.Hot.value,
200 t.graphs[key][PT.T.value].to_list(),
201 t.graphs[key][PT.H_HOT_NET.value].to_list(),
202 ) +
203 _graph_cc(
204 StreamType.Cold.value,
205 t.graphs[key][PT.T.value].to_list(),
206 t.graphs[key][PT.H_COLD_NET.value].to_list(),
207 ) +
208 _graph_cc(
209 StreamType.Cold.value,
210 t.graphs[key][PT.T.value].to_list(),
211 t.graphs[key][PT.H_COLD_UT.value].to_list(),
212 ) +
213 _graph_cc(
214 StreamType.Hot.value,
215 t.graphs[key][PT.T.value].to_list(),
216 t.graphs[key][PT.H_HOT_UT.value].to_list(),
217 ),
218 }
219 graph_set['graphs'].append(g)
221 # === Site Utility Grand Composite Curves ===
222 if GT.SUGCC.value in t.graphs:
223 key = GT.SUGCC.value
224 g = {
225 'type': key,
226 'name': f'Site Utility Grand Composite Curve: {graphTitle}',
227 'segments':
228 _graph_gcc(
229 t.graphs[key][PT.T.value].to_list(),
230 t.graphs[key][PT.H_UT_NET.value].to_list(),
231 ),
232 }
233 graph_set['graphs'].append(g)
235 return graph_set
238def _graph_cc(curve_type: str, y_vals: List[float], x_vals: List[float], IncludeArrows: bool =True, Decolour: bool =False) -> list:
239 """Plots a (shifted) hot or cold composite curve."""
241 # Clean composite
242 y_vals, x_vals = _clean_composite(y_vals, x_vals)
244 # Add Hot CC segment
245 if curve_type == StreamType.Hot.value:
246 return [_create_curve(
247 title='Hot CC',
248 colour=LineColour.Hot.value if not Decolour else LineColour.Black.value,
249 arrow=(ArrowHead.END.value if IncludeArrows else ArrowHead.NO_ARROW.value),
250 x_vals=x_vals,
251 y_vals=y_vals
252 )]
254 # Add Cold CC segment
255 elif curve_type == StreamType.Cold.value: 255 ↛ 265line 255 didn't jump to line 265 because the condition on line 255 was always true
256 return [_create_curve(
257 title='Cold CC',
258 colour=LineColour.Cold.value if not Decolour else LineColour.Black.value,
259 arrow=(ArrowHead.START.value if IncludeArrows else ArrowHead.NO_ARROW.value),
260 x_vals=x_vals,
261 y_vals=y_vals
262 )]
264 else:
265 raise ValueError("Unrecognised composite curve type.")
268def _graph_gcc(y_vals: List[float], x_vals: List[float], utility_profile: bool = False, decolour: bool = False) -> list:
269 """Creates segments for a Grand Composite Curve."""
271 # Clean composite
272 y_vals, x_vals = _clean_composite(y_vals, x_vals)
274 # Find start and end indices of useful data
275 start_idx = next((j for j in range(len(x_vals) - 1) if abs(x_vals[j] - x_vals[j + 1]) > ZERO), 0)
276 end_idx = next((j for j in range(len(x_vals) - 1, 0, -1) if abs(x_vals[j] - x_vals[j - 1]) > ZERO), len(x_vals) - 1)
278 curves = []
279 j = start_idx
281 # Counters for segment naming
282 cold_pro_segs, hot_pro_segs, hot_ut_segs, cold_ut_segs, zero_segs = 0, 0, 0, 0, 0
284 while j < end_idx:
285 enthalpy_diff = x_vals[j] - x_vals[j + 1]
286 segment_type = None
288 if enthalpy_diff > ZERO:
289 segment_type = 'cold_pro' if not utility_profile else 'hot_ut'
290 elif enthalpy_diff < -ZERO: 290 ↛ 293line 290 didn't jump to line 293 because the condition on line 290 was always true
291 segment_type = 'hot_pro' if not utility_profile else 'cold_ut'
292 else:
293 segment_type = 'zero'
295 # Find where this segment ends
296 next_j = j + 1
297 while next_j < end_idx:
298 next_diff = x_vals[next_j] - x_vals[next_j + 1]
299 if (
300 (segment_type == 'cold_pro' and next_diff < -ZERO) or
301 (segment_type == 'hot_pro' and next_diff > ZERO) or
302 (segment_type == 'hot_ut' and next_diff < -ZERO) or
303 (segment_type == 'cold_ut' and next_diff > ZERO) or
304 (segment_type == 'zero' and abs(next_diff) > ZERO)
305 ):
306 break
307 next_j += 1
309 # Extract segment data
310 x_seg = x_vals[j:next_j + 1]
311 y_seg = y_vals[j:next_j + 1]
313 # Segment title and color
314 if segment_type == 'cold_pro':
315 cold_pro_segs += 1
316 title = f"Cold Process Segment {cold_pro_segs}"
317 colour = LineColour.Cold.value
318 elif segment_type == 'hot_pro': 318 ↛ 322line 318 didn't jump to line 322 because the condition on line 318 was always true
319 hot_pro_segs += 1
320 title = f"Hot Process Segment {hot_pro_segs}"
321 colour = LineColour.Hot.value
322 elif segment_type == 'hot_ut':
323 hot_ut_segs += 1
324 title = f"Hot Utility Segment {hot_ut_segs}"
325 colour = LineColour.Hot.value
326 elif segment_type == 'cold_ut':
327 cold_ut_segs += 1
328 title = f"Cold Utility Segment {cold_ut_segs}"
329 colour = LineColour.Cold.value
330 else: # Zero
331 zero_segs += 1
332 title = f"Vertical Segment {zero_segs}"
333 colour = LineColour.Other.value
335 curves.append(_create_curve(
336 title=title,
337 colour=colour if not decolour else LineColour.Black.value,
338 x_vals=x_seg,
339 y_vals=y_seg,
340 arrow=ArrowHead.NO_ARROW.value
341 ))
343 j = next_j
345 return curves
348def _clean_composite(y_vals: List[float], x_vals: List[float]) -> Tuple[List[float], List[float]]:
349 """Remove redundant points in composite curves."""
351 # Round to avoid tiny numerical errors
352 x_vals = [round(x, 5) for x in x_vals]
353 y_vals = [round(y, 5) for y in y_vals]
355 if len(x_vals) <= 2: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 return y_vals, x_vals
358 x_clean, y_clean = [x_vals[0]], [y_vals[0]]
360 for i in range(1, len(x_vals) - 1):
361 x1, x2, x3 = x_vals[i-1], x_vals[i], x_vals[i+1]
362 y1, y2, y3 = y_vals[i-1], y_vals[i], y_vals[i+1]
364 if x1 == x3:
365 # All three x are the same; keep x2 only if y2 is different
366 if x1 != x2:
367 x_clean.append(x2)
368 y_clean.append(y2)
369 else:
370 # Linear interpolation check
371 y_interp = y1 + (y3 - y1) * (x2 - x1) / (x3 - x1)
372 if abs(y2 - y_interp) > ZERO:
373 x_clean.append(x2)
374 y_clean.append(y2)
376 x_clean.append(x_vals[-1])
377 y_clean.append(y_vals[-1])
379 if abs(x_clean[0] - x_clean[1]) < ZERO:
380 x_clean.pop(0)
381 y_clean.pop(0)
383 i = len(x_clean) - 1
384 if abs(x_clean[i] - x_clean[i-1]) < ZERO:
385 x_clean.pop(i)
386 y_clean.pop(i)
388 # offset = 0
389 # for i in range(len(x_clean) - 1):
390 # x1, x2 = x_clean[i - offset], x_clean[i+1 - offset]
391 # if abs(x1 - x2) < ZERO:
392 # x_clean.pop(i - offset)
393 # y_clean.pop(i - offset)
394 # offset += 1
395 # if offset > 1:
396 # pass
397 # else:
398 # break
400 # offset = 0
401 # for i in reversed(range(len(x_clean) - 1)):
402 # x1, x2 = x_clean[i - offset], x_clean[i-1 - offset]
403 # if abs(x1 - x2) < ZERO:
404 # x_clean.pop(i - offset)
405 # y_clean.pop(i - offset)
406 # offset += 1
407 # else:
408 # break
410 return y_clean, x_clean
413def _create_curve(title: str, colour: int, x_vals, y_vals, arrow=ArrowHead.NO_ARROW.value) -> dict:
414 """Creates an individual curve from data points."""
415 curve = {
416 'title': title,
417 'colour': colour,
418 'arrow': arrow
419 }
420 curve['data_points'] = [
421 {'x': round(x, DECIMAL_PLACES), 'y': round(y, DECIMAL_PLACES)}
422 for x, y in zip(x_vals, y_vals) if x != None and y != None
423 ]
424 return curve
427# def Graph_ETD(site: Zone, ETD, ETD_header, zone_name, graph, IncRecoveryHX=True, HCC=None):
428# """Graphs an ETD.
429# """
431# Tot_HX = [0 for i in range(4)]
432# x_seg = [0, 0]
433# y_seg = [0, 0]
435# x_col = len(ETD) - 2
436# y_col = 0
438# j_0 = 1
439# j_1 = len(ETD[0])
441# x_val = [0] * (j_1 - j_0 + 1)
442# y_val = [0] * (j_1 - j_0 + 1)
444# for j in range(j_0, j_1):
445# y_val[j - j_0] = ETD[y_col][j]
447# for i in range(x_col, 0, -3):
448# if not (ETD[i + 1][0] == 'R' and not IncRecoveryHX):
449# for j in range(j_0, j_1 - 1):
450# x_val[j] = ETD[i][j] + x_val[j]
451# if ETD[i + 1][0] == 'H':
452# Tot_HX[1] += 1
453# elif ETD[i + 1][0] == 'R':
454# Tot_HX[2] += 1
455# elif ETD[i + 1][0] == 'C':
456# Tot_HX[3] += 1
457# Tot_HX[0] = Tot_HX[1] + Tot_HX[2] + Tot_HX[3]
459# if site.config.AUTOREORDER:
460# if site.config.AUTOREORDER_1:
461# stat_row = 1
462# elif site.config.AUTOREORDER_2:
463# stat_row = 2
464# elif site.config.AUTOREORDER_3:
465# stat_row = 3
466# else:
467# stat_row = 4
468# else:
469# stat_row = 4
471# Graph_ETC(ETD, ETD_header, graph, zone_name, stat_row, y_col, x_val, y_val, 'R', IncRecoveryHX, Tot_HX[1])
472# Graph_ETC(ETD, ETD_header, graph, zone_name, stat_row, y_col, x_val, y_val, 'C', IncRecoveryHX, Tot_HX[2])
473# Graph_ETC(ETD, ETD_header, graph, zone_name, stat_row, y_col, x_val, y_val, 'H', IncRecoveryHX, Tot_HX[0])
475# if zone_name[-3:] == 'ACC':
476# _graph_cc(HCC, 4, 7, 0, graph, False, True)
478# site_ret = list(filter(lambda p: p.name == TargetType.ET.value, site.subzones))[0]
479# x_seg[1] = site_ret.cold_utility_target
480# y_seg[0] = ETD[1][len(ETD[1]) - 1]
481# y_seg[1] = ETD[1][len(ETD[1]) - 1]
483# graph['segments'].append(_create_curve(
484# title='Cold Ut Segment',
485# colour=LineColour.Cold.value,
486# x_vals= x_seg,
487# y_vals= y_seg
488# ))
490# y_seg[0] = ETD[0][1]
491# y_seg[1] = ETD[0][1]
492# if zone_name[-3:] != 'ACC':
493# x_seg[1] = site_ret.hot_utility_target
494# else:
495# x_seg[0] = site_ret.cold_utility_target + site_ret.heat_recovery_target
496# x_seg[1] = site_ret.hot_utility_target + x_seg[1]
498# graph['segments'].append(_create_curve(
499# title='Hot Ut Segment',
500# colour=LineColour.Hot.value,
501# x_vals=x_seg,
502# y_vals=y_seg
503# ))
505# return graph
508# def Graph_ETC(ETD, ETD_header, graph, zone_name, stat_row, y_col, x_val_base, y_val, HX_Type, IncRecoveryHX, HX_countdown):
509# if HX_countdown == 0:
510# return
511# prev_points = [None, None]
512# x_col = len(ETD) - 2
514# di = 3 if zone_name[-3:] == 'ACC' else 0
515# min_val = (10) ** 12
516# for i in range(len(ETD) - 2, di, -3):
517# if min_val > ETD_header[i - di][stat_row] and ETD[i + 1][0] == HX_Type:
518# min_val = ETD_header[i - di][stat_row]
519# x_col = i - di
521# if x_col <= len(ETD_header) - 1:
522# ETD_header[x_col][stat_row] = (10) ** 12
524# if ETD[x_col + 1][0] == HX_Type and (HX_Type == 'H' or HX_Type == 'C' or (HX_Type == 'R' and IncRecoveryHX)):
526# j_0 = 1
527# j_1 = len(ETD[1]) - 1
529# if HX_Type == 'R' or HX_Type == 'C':
530# for j_0 in range(j_0, len(ETD[0])):
531# if abs(ETD[x_col - 1][j_0 + 1]) > ZERO:
532# break
534# if HX_Type == 'R' or HX_Type == 'H':
535# for j_1 in range(j_1, 1, -1):
536# if abs(ETD[x_col - 1][j_1]) > ZERO:
537# break
539# j = j_0
540# while j < j_1:
541# tmp = ETD[x_col - 1][j + 1]
542# if tmp > ZERO:
543# # Find process heat deficit segments and utility heat supply segments
544# seg_type = 1
545# elif tmp < -ZERO:
546# # Find process heat surplus segments and utility heat sink segments
547# seg_type = 2
548# else:
549# # Zero heat transfer
550# seg_type = 3
552# for j in range(j + 1, j_1 - 1):
553# if seg_type == 1:
554# if ETD[x_col - 1][j + 1] < ZERO:
555# break
556# elif seg_type == 2:
557# if ETD[x_col - 1][j + 1] > -ZERO:
558# break
559# elif seg_type == 3:
560# if abs(ETD[x_col - 1][j + 1]) > ZERO:
561# break
563# j_i = j
564# x_seg = [None] * (j_i - j_0 + 3)
565# y_seg = [None] * (j_i - j_0 + 3)
567# for j in range(j_0, j_i + 3):
568# x_seg[j - j_0] = x_val_base[j]
569# y_seg[j - j_0] = ETD[y_col][j]
571# j += 1
572# j_0 = j
574# if prev_points[0] is not None:
575# x_seg.insert(0, prev_points[0])
576# y_seg.insert(0, prev_points[1])
577# prev_points[0] = x_seg[-1]
578# prev_points[1] = y_seg[-1]
579# else:
580# prev_points[0] = x_seg[-1]
581# prev_points[1] = y_seg[-1]
583# if seg_type == 1:
584# graph['segments'].append(_create_curve(
585# title='Cold Segment',
586# colour=LineColour.Cold.value,
587# x_vals=x_seg,
588# y_vals=y_seg
589# ))
590# elif seg_type == 2:
591# graph['segments'].append(_create_curve(
592# title='Hot Segment',
593# colour=LineColour.Hot.value,
594# x_vals=x_seg,
595# y_vals=y_seg
596# ))
597# else:
598# graph['segments'].append(_create_curve(
599# title='Zero Segment',
600# colour=LineColour.Other.value,
601# x_vals=x_seg,
602# y_vals=y_seg
603# ))
605# x_val = [0.0 for i in range(len(ETD[0]) - 1)]
606# for j in range(1, len(ETD[0]) - 1):
607# x_val[j] = x_val_base[j] - ETD[x_col][j]
608# else:
609# x_val = x_val_base
611# if HX_countdown > 1 + ZERO:
612# Graph_ETC(ETD, ETD_header, graph, zone_name, stat_row, y_col, x_val, y_val, HX_Type, IncRecoveryHX, HX_countdown - 1)
613# x_val_base = x_val
615# return graph
618# def Create_ERC_Graph_Set(graph_set: dict, site: Zone, process: Zone) -> dict:
619# graph = {'segments': [], 'type': GT.ERC.value, 'name': 'Shifted ETD'}
620# graph_set['graphs'].append(Graph_ETD(site, process.graphs['ETD_star'], process.graphs['ETD_header'], 'Shifted ETD', graph))
622# graph = {'segments': [], 'type': GT.ERC.value, 'name': 'ETD'}
623# graph_set['graphs'].append(Graph_ETD(site, process.graphs['ETD'], process.graphs['ETD_header'], 'ETD', graph))
625# graph = {'segments': [], 'type': GT.ERC.value, 'name': 'Shifted ETD without Recovery'}
626# graph_set['graphs'].append(Graph_ETD(site, process.graphs['ETD_star'], process.graphs['ETD_header'], 'Shifted ETD without recovery', graph, False))
628# graph = {'segments': [], 'type': GT.ERC.value, 'name': 'ETD without Recovery'}
629# graph_set['graphs'].append(Graph_ETD(site, process.graphs['ETD'], process.graphs['ETD_header'], 'ETD without recovery', graph, False))
631# return graph_set