Coverage for backend/pinch_service/OpenPinch/src/main.py: 46%
44 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import os
2import json
3from .classes import *
4from .lib import *
5from .utils import *
6from .analysis import prepare_problem_struture, get_site_targets, get_process_pinch_targets, get_regional_targets, visualise_graphs, output_response
9__all__ = ["target", "visualise"]
11@timing_decorator
12def target(streams: List[StreamSchema], utilities: List[UtilitySchema], options: List[Options], name: str ='Project', zone_tree: ZoneTreeSchema = None) -> dict:
13 """Conduct advanced pinch analysis and total site analysis on the given streams and utilities."""
14 master_zone = prepare_problem_struture(streams, utilities, options, name, zone_tree)
15 if master_zone.identifier in [ZoneType.R.value, ZoneType.C.value]: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true
16 master_zone = get_regional_targets(master_zone)
17 elif master_zone.identifier == ZoneType.S.value:
18 master_zone = get_site_targets(master_zone)
19 elif master_zone.identifier == ZoneType.P.value: 19 ↛ 22line 19 didn't jump to line 22 because the condition on line 19 was always true
20 master_zone = get_process_pinch_targets(master_zone)
21 else:
22 raise ValueError("No valid zone passed into OpenPinch for analysis.")
24 return output_response(master_zone)
27########### TODO: This function is untested and not updated since the overhaul of OpenPinch. Broken, most likely.#####
28@timing_decorator
29def visualise(data) -> dict:
30 """Function for building graphs from problem tables as opposed to class instances."""
31 r_data = {'graphs': []}
32 z: Zone
33 for z in data:
34 graph_set = {'name': f"{z.name}", 'graphs': []}
35 for graph in z.graphs:
36 visualise_graphs(graph_set, graph)
37 r_data['graphs'].append(graph_set)
38 return r_data
41#######################################################################################################
42# Local Run Functions
43#######################################################################################################
45def targeting_analysis_from_pinch_service(data: Any, parent_fs_name: str ='Project') -> TargetResponse:
46 """Calculates targets and outputs from inputs and options"""
47 # Validate request data using Pydantic model
48 request_data = TargetRequest.model_validate(data)
50 # Perform advanced pinch analysis and total site analysis
51 return_data = target(
52 zone_tree=request_data.zone_tree,
53 streams=request_data.streams,
54 utilities=request_data.utilities,
55 options=request_data.options,
56 name=parent_fs_name,
57 )
59 # Validate response data
60 validated_data = TargetResponse.model_validate(return_data)
62 # Return data
63 return validated_data
66def get_test_filenames():
67 filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'tests/test_data')
68 return [
69 {"filename": filename, "filepath": filepath}
70 for filename in os.listdir(filepath)
71 if filename.startswith("p_") and filename.endswith(".json")
72 ]
75def run_pinch_analysis_comparison(file_info):
76 p_file_path = os.path.join(file_info["filepath"], 'p_' + file_info["filename"][2:])
77 r_file_path = os.path.join(file_info["filepath"], 'r_' + file_info["filename"][2:])
79 with open(p_file_path) as f:
80 input_data = json.load(f)
82 res = targeting_analysis_from_pinch_service(input_data, file_info["filename"])
84 with open(r_file_path) as f:
85 wkb_res = json.load(f)
86 wkb_res = TargetResponse.model_validate(wkb_res)
88 # print(f'Name: {res.name}')
89 # for z in res.targets:
90 # for z0 in wkb_res.targets:
91 # if z0.name in z.name:
92 # break
94 # print('')
95 # print('Name:', z.name, z0.name)
96 # print('Qh:', round(get_value(z.Qh), 2), 'Qh:', round(get_value(z0.Qh), 2), round(get_value(z.Qh), 2)==round(get_value(z0.Qh), 2), sep='\t')
97 # print('Qc:', round(get_value(z.Qc), 2), 'Qc:', round(get_value(z0.Qc), 2), round(get_value(z.Qc), 2)==round(get_value(z0.Qc), 2), sep='\t')
98 # print('Qr:', round(get_value(z.Qr), 2), 'Qr:', round(get_value(z0.Qr), 2), round(get_value(z.Qr), 2)==round(get_value(z0.Qr), 2), sep='\t')
99 # [print(z.hot_utilities[i].name + ':', round(get_value(z.hot_utilities[i].heat_flow), 2), z0.hot_utilities[i].name + ':', round(get_value(z0.hot_utilities[i].heat_flow), 2), round(get_value(z.hot_utilities[i].heat_flow), 2)==round(get_value(z0.hot_utilities[i].heat_flow), 2), sep='\t') for i in range(len(z.hot_utilities))]
100 # [print(z.cold_utilities[i].name + ':', round(get_value(z.cold_utilities[i].heat_flow), 2), z0.cold_utilities[i].name + ':', round(get_value(z0.cold_utilities[i].heat_flow), 2), round(get_value(z.cold_utilities[i].heat_flow), 2)==round(get_value(z0.cold_utilities[i].heat_flow), 2), sep='\t') for i in range(len(z.cold_utilities))]
101 # print('')