Coverage for backend/pinch_service/OpenPinch/src/main.py: 46%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import os 

2import json 

3from .classes import * 

4from .lib import * 

5from .utils import * 

6from .analysis import prepare_problem_struture, get_site_targets, get_process_pinch_targets, get_regional_targets, visualise_graphs, output_response 

7 

8 

9__all__ = ["target", "visualise"] 

10 

11@timing_decorator 

12def target(streams: List[StreamSchema], utilities: List[UtilitySchema], options: List[Options], name: str ='Project', zone_tree: ZoneTreeSchema = None) -> dict: 

13 """Conduct advanced pinch analysis and total site analysis on the given streams and utilities.""" 

14 master_zone = prepare_problem_struture(streams, utilities, options, name, zone_tree) 

15 if master_zone.identifier in [ZoneType.R.value, ZoneType.C.value]: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true

16 master_zone = get_regional_targets(master_zone) 

17 elif master_zone.identifier == ZoneType.S.value: 

18 master_zone = get_site_targets(master_zone) 

19 elif master_zone.identifier == ZoneType.P.value: 19 ↛ 22line 19 didn't jump to line 22 because the condition on line 19 was always true

20 master_zone = get_process_pinch_targets(master_zone) 

21 else: 

22 raise ValueError("No valid zone passed into OpenPinch for analysis.") 

23 

24 return output_response(master_zone) 

25 

26 

27########### TODO: This function is untested and not updated since the overhaul of OpenPinch. Broken, most likely.##### 

28@timing_decorator 

29def visualise(data) -> dict: 

30 """Function for building graphs from problem tables as opposed to class instances.""" 

31 r_data = {'graphs': []} 

32 z: Zone 

33 for z in data: 

34 graph_set = {'name': f"{z.name}", 'graphs': []} 

35 for graph in z.graphs: 

36 visualise_graphs(graph_set, graph) 

37 r_data['graphs'].append(graph_set) 

38 return r_data 

39 

40 

41####################################################################################################### 

42# Local Run Functions 

43####################################################################################################### 

44 

45def targeting_analysis_from_pinch_service(data: Any, parent_fs_name: str ='Project') -> TargetResponse: 

46 """Calculates targets and outputs from inputs and options""" 

47 # Validate request data using Pydantic model 

48 request_data = TargetRequest.model_validate(data) 

49 

50 # Perform advanced pinch analysis and total site analysis 

51 return_data = target( 

52 zone_tree=request_data.zone_tree, 

53 streams=request_data.streams, 

54 utilities=request_data.utilities, 

55 options=request_data.options, 

56 name=parent_fs_name, 

57 ) 

58 

59 # Validate response data 

60 validated_data = TargetResponse.model_validate(return_data) 

61 

62 # Return data 

63 return validated_data 

64 

65 

66def get_test_filenames(): 

67 filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'tests/test_data') 

68 return [ 

69 {"filename": filename, "filepath": filepath} 

70 for filename in os.listdir(filepath) 

71 if filename.startswith("p_") and filename.endswith(".json") 

72 ] 

73 

74 

75def run_pinch_analysis_comparison(file_info): 

76 p_file_path = os.path.join(file_info["filepath"], 'p_' + file_info["filename"][2:]) 

77 r_file_path = os.path.join(file_info["filepath"], 'r_' + file_info["filename"][2:]) 

78 

79 with open(p_file_path) as f: 

80 input_data = json.load(f) 

81 

82 res = targeting_analysis_from_pinch_service(input_data, file_info["filename"]) 

83 

84 with open(r_file_path) as f: 

85 wkb_res = json.load(f) 

86 wkb_res = TargetResponse.model_validate(wkb_res) 

87 

88 # print(f'Name: {res.name}') 

89 # for z in res.targets: 

90 # for z0 in wkb_res.targets: 

91 # if z0.name in z.name: 

92 # break 

93 

94 # print('') 

95 # print('Name:', z.name, z0.name) 

96 # print('Qh:', round(get_value(z.Qh), 2), 'Qh:', round(get_value(z0.Qh), 2), round(get_value(z.Qh), 2)==round(get_value(z0.Qh), 2), sep='\t') 

97 # print('Qc:', round(get_value(z.Qc), 2), 'Qc:', round(get_value(z0.Qc), 2), round(get_value(z.Qc), 2)==round(get_value(z0.Qc), 2), sep='\t') 

98 # print('Qr:', round(get_value(z.Qr), 2), 'Qr:', round(get_value(z0.Qr), 2), round(get_value(z.Qr), 2)==round(get_value(z0.Qr), 2), sep='\t') 

99 # [print(z.hot_utilities[i].name + ':', round(get_value(z.hot_utilities[i].heat_flow), 2), z0.hot_utilities[i].name + ':', round(get_value(z0.hot_utilities[i].heat_flow), 2), round(get_value(z.hot_utilities[i].heat_flow), 2)==round(get_value(z0.hot_utilities[i].heat_flow), 2), sep='\t') for i in range(len(z.hot_utilities))] 

100 # [print(z.cold_utilities[i].name + ':', round(get_value(z.cold_utilities[i].heat_flow), 2), z0.cold_utilities[i].name + ':', round(get_value(z0.cold_utilities[i].heat_flow), 2), round(get_value(z.cold_utilities[i].heat_flow), 2)==round(get_value(z0.cold_utilities[i].heat_flow), 2), sep='\t') for i in range(len(z.cold_utilities))] 

101 # print('') 

102