Coverage for backend/django/PinchAnalysis/views/SegmentViewSet.py: 62%

145 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-12-18 04:00 +0000

1from PinchAnalysis.models.HenNode import HenNode 

2from core.auxiliary.views.ExtractSegmentDataFromFS import _calc_area 

3from core.auxiliary.enums.generalEnums import AbstractionType 

4from core.viewset import ModelViewSet 

5from PinchAnalysis.models.InputModels import PinchInputs, Segment, StreamDataEntry 

6from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes 

7from rest_framework.response import Response 

8from rest_framework import serializers 

9from rest_framework.decorators import action 

10import traceback 

11from PinchAnalysis.models.StreamDataProject import StreamDataProject 

12from flowsheetInternals.graphicData.models.groupingModel import Grouping 

13from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer 

14from core.auxiliary.models.Flowsheet import Flowsheet 

15 

16class BulkCreateStreamsSerializer(serializers.Serializer): 

17 projectID = serializers.IntegerField(required=True) 

18 streams = serializers.ListField( 

19 child=serializers.DictField(), 

20 required=True, 

21 ) 

22 

23class CustomSegmentSerializer(serializers.Serializer): 

24 id = serializers.IntegerField(allow_null=True) 

25 name = serializers.CharField() 

26 custom = serializers.BooleanField() 

27 type = serializers.CharField() 

28 t_supply = serializers.FloatField(allow_null=True) 

29 t_target = serializers.FloatField(allow_null=True) 

30 p_supply = serializers.FloatField(allow_null=True) 

31 p_target = serializers.FloatField(allow_null=True) 

32 h_supply = serializers.FloatField(allow_null=True) 

33 h_target = serializers.FloatField(allow_null=True) 

34 heat_flow = serializers.FloatField(allow_null=True) 

35 dt_cont = serializers.FloatField(allow_null=True) 

36 htc = serializers.FloatField(allow_null=True) 

37 area = serializers.FloatField(allow_null=True) 

38 children = serializers.ListField(child=serializers.DictField()) 

39 

40class CreateSegmentSerializer(serializers.Serializer): 

41 name = serializers.CharField(required=True) 

42 t_supply = serializers.FloatField(required=True) 

43 t_target = serializers.FloatField(required=True) 

44 heat_flow = serializers.FloatField(required=True) 

45 dt_cont = serializers.FloatField(required=True) 

46 htc = serializers.FloatField(required=True) 

47 parentZone = serializers.CharField(required=True) 

48 

49class SegmentViewSet(ModelViewSet): 

50 serializer_class = SegmentSerializer 

51 

52 def get_queryset(self): 

53 queryset = Segment.objects.all() 

54 return queryset 

55 

56 @extend_schema( 

57 responses=CustomSegmentSerializer 

58 ) 

59 def list(self, request): 

60 stream_data_projects = StreamDataProject.objects.all() 

61 all_groups = {} 

62 

63 # Step 1: Build all_groups with zones and child Zones/Segments 

64 for stream_data_project in stream_data_projects: 

65 for stream_data_entry in stream_data_project.StreamDataEntries.all(): 

66 zone = stream_data_entry.zone 

67 group = stream_data_entry.group 

68 

69 

70 # Prepare Segment nodes as child nodes 

71 segments = Segment.objects.filter( 

72 stream_data_entry=stream_data_entry) 

73 segment_children = [{ 

74 "id": segment.id, 

75 "name": segment.name, 

76 "type": "Segment", 

77 "custom": segment.stream_data_entry.custom, 

78 "t_supply": segment.t_supply, 

79 "t_target": segment.t_target, 

80 "p_supply": segment.p_supply, 

81 "p_target": segment.p_target, 

82 "h_supply": segment.h_supply, 

83 "h_target": segment.h_target, 

84 "heat_flow": segment.heat_flow, 

85 "dt_cont": segment.dt_cont, 

86 "htc": segment.htc, 

87 "area": segment.area, 

88 "stream_data_entry": segment.stream_data_entry.id, 

89 "hen_node": segment.hen_node.id, 

90 "children": [] # Segments don't have children 

91 } for segment in segments] 

92 

93 if zone not in all_groups: 93 ↛ 101line 93 didn't jump to line 101 because the condition on line 93 was always true

94 all_groups[zone] = { 

95 "group": group, 

96 "stream_data_entry": stream_data_entry, 

97 "children": [], 

98 } 

99 

100 # Add segments to the current zone's children 

101 all_groups[zone]["children"].extend(segment_children) 

102 

103 # Step 2: Organize into a tree 

104 # Need to review the parent groups creation here, it's a messy temporary fix but likely to cause problems longterm. 

105 root_node = None 

106 parent_groups = {} 

107 for zone_name, zone_data in all_groups.items(): 

108 group = zone_data["group"] 

109 parent_group = group.get_parent_group() 

110 if parent_group: 

111 parent_zone = parent_group.simulationObject.componentName 

112 if parent_zone in all_groups: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true

113 all_groups[parent_zone]["children"].append(zone_data) 

114 elif parent_zone in parent_groups: 

115 parent_groups[parent_zone]["children"].append(zone_data) 

116 else: 

117 parent_groups[parent_zone] = { 

118 "group": parent_group, 

119 "children":[], 

120 } 

121 parent_groups[parent_zone]["children"].append(zone_data) 

122 root_node = parent_groups[parent_zone] 

123 else: 

124 root_node = zone_data 

125 

126 # Step 3: Format the output recursively 

127 def clean_node(node): 

128 # If it's a Segment node, it's already cleaned 

129 if node.get("type") == "Segment" or node.get("type") == "CustomSegment": 

130 return node 

131 return { 

132 "name": f'{node["group"].simulationObject.componentName} ({node["group"].abstractionType})', 

133 "type": node["group"].abstractionType, 

134 "custom": False, 

135 "t_supply": None, 

136 "t_target": None, 

137 "p_supply": None, 

138 "p_target": None, 

139 "h_supply": None, 

140 "h_target": None, 

141 "heat_flow": None, 

142 "dt_cont": None, 

143 "htc": None, 

144 "children": [clean_node(child) for child in node["children"]], 

145 } 

146 

147 if root_node is None: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 return Response(data={}) 

149 

150 cleaned = clean_node(root_node) 

151 return Response(CustomSegmentSerializer(cleaned).data) 

152 

153 @extend_schema(request=CreateSegmentSerializer, responses=None) 

154 @action(methods=['post'], detail=False, url_path='create-new-segment') 

155 def create_segment(self, request): 

156 serializer = CreateSegmentSerializer(data=request.data) 

157 serializer.is_valid(raise_exception=True) 

158 

159 flowsheet = request.query_params.get("flowsheet") 

160 flowsheet = Flowsheet.objects.get(pk=flowsheet) 

161 parentZone = serializer.validated_data.pop("parentZone") 

162 parentGroup = Grouping.objects.get( 

163 simulationObject__componentName=parentZone 

164 ) 

165 

166 group = parentGroup 

167 

168 stream_data_entry = StreamDataEntry.objects.create( 

169 flowsheet=flowsheet, 

170 custom=True, 

171 group=group, 

172 streamDataProject=flowsheet.StreamDataProject, 

173 ) 

174 

175 segment = Segment.objects.create( 

176 stream_data_entry=stream_data_entry, 

177 **serializer.validated_data 

178 ) 

179 segment.area = segment._calc_area() 

180 segment.save(update_fields=["area"]) 

181 

182 return Response(SegmentSerializer(segment).data, status=201) 

183 

184 

185 @extend_schema(request=BulkCreateStreamsSerializer, responses=None) 

186 @action(methods=['post'], detail=False, url_path='bulk-create') 

187 def bulk_create(self, request): 

188 try: 

189 serializer = BulkCreateStreamsSerializer(data=request.data) 

190 serializer.is_valid(raise_exception=True) 

191 validated_data = serializer.validated_data 

192 streamZones = [] 

193 flowsheetID = request.query_params.get("flowsheet") 

194 flowsheet = Flowsheet.objects.get(pk=flowsheetID) 

195 stream_objects = [] 

196 streams = validated_data.get("streams") 

197 for stream in streams: 

198 if stream["parentZone"] not in streamZones: 

199 streamZones.append(stream["parentZone"]) 

200 for zone in streamZones: 

201 if not Grouping.objects.filter(simulationObject__componentName=zone).exists(): 

202 #This should probably be handled by a bulk create within the upload process for open pinch. 

203 parentGroup = Grouping.create( 

204 flowsheet=flowsheet, 

205 group=Grouping.objects.get(simulationObject__componentName="Flowsheet"), 

206 componentName=zone, 

207 visible=False 

208 ) 

209 parentGroup.abstractionType = AbstractionType.Zone 

210 parentGroup.save() 

211 else: 

212 parentGroup = Grouping.objects.get( 

213 simulationObject__componentName=zone 

214 ) 

215 stream_data_entry = StreamDataEntry.objects.create( 

216 flowsheet=flowsheet, 

217 custom=True, 

218 group=parentGroup, 

219 streamDataProject=flowsheet.StreamDataProject, 

220 ) 

221 zoneStreams = [stream for stream in streams if stream["parentZone"]==zone] 

222 for stream in zoneStreams: 

223 del stream["parentZone"] 

224 segment = Segment(flowsheet=flowsheet, stream_data_entry=stream_data_entry, **stream) 

225 segment.area = segment._calc_area() 

226 stream_objects.append(segment) 

227 streams.remove(stream) 

228 

229 Segment.objects.bulk_create(stream_objects) 

230 

231 return Response({'status': 'success'}, status=201) 

232 except Exception as e: 

233 return self.error_response(e) 

234 

235 @action(methods=['post'], detail=False, url_path='delete-all') 

236 def delete_all(self, request): 

237 try: 

238 flowsheet = request.query_params.get("flowsheet") 

239 flowsheet = Flowsheet.objects.get(pk=flowsheet) 

240 project = flowsheet.StreamDataProject 

241 

242 project.Inputs.PinchUtilities.all().delete() 

243 

244 return Response({'status': 'success'}, status=204) 

245 except Exception as e: 

246 return self.error_response(e) 

247 

248 def error_response(self, e): 

249 tb_info = traceback.format_exc() 

250 error_message = str(e) 

251 response_data = {'status': 'error', 

252 'message': error_message, 'traceback': tb_info} 

253 return Response(response_data, status=400)