Coverage for backend/django/PinchAnalysis/views/SegmentViewSet.py: 62%
145 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-12-18 04:00 +0000
1from PinchAnalysis.models.HenNode import HenNode
2from core.auxiliary.views.ExtractSegmentDataFromFS import _calc_area
3from core.auxiliary.enums.generalEnums import AbstractionType
4from core.viewset import ModelViewSet
5from PinchAnalysis.models.InputModels import PinchInputs, Segment, StreamDataEntry
6from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
7from rest_framework.response import Response
8from rest_framework import serializers
9from rest_framework.decorators import action
10import traceback
11from PinchAnalysis.models.StreamDataProject import StreamDataProject
12from flowsheetInternals.graphicData.models.groupingModel import Grouping
13from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer
14from core.auxiliary.models.Flowsheet import Flowsheet
16class BulkCreateStreamsSerializer(serializers.Serializer):
17 projectID = serializers.IntegerField(required=True)
18 streams = serializers.ListField(
19 child=serializers.DictField(),
20 required=True,
21 )
23class CustomSegmentSerializer(serializers.Serializer):
24 id = serializers.IntegerField(allow_null=True)
25 name = serializers.CharField()
26 custom = serializers.BooleanField()
27 type = serializers.CharField()
28 t_supply = serializers.FloatField(allow_null=True)
29 t_target = serializers.FloatField(allow_null=True)
30 p_supply = serializers.FloatField(allow_null=True)
31 p_target = serializers.FloatField(allow_null=True)
32 h_supply = serializers.FloatField(allow_null=True)
33 h_target = serializers.FloatField(allow_null=True)
34 heat_flow = serializers.FloatField(allow_null=True)
35 dt_cont = serializers.FloatField(allow_null=True)
36 htc = serializers.FloatField(allow_null=True)
37 area = serializers.FloatField(allow_null=True)
38 children = serializers.ListField(child=serializers.DictField())
40class CreateSegmentSerializer(serializers.Serializer):
41 name = serializers.CharField(required=True)
42 t_supply = serializers.FloatField(required=True)
43 t_target = serializers.FloatField(required=True)
44 heat_flow = serializers.FloatField(required=True)
45 dt_cont = serializers.FloatField(required=True)
46 htc = serializers.FloatField(required=True)
47 parentZone = serializers.CharField(required=True)
49class SegmentViewSet(ModelViewSet):
50 serializer_class = SegmentSerializer
52 def get_queryset(self):
53 queryset = Segment.objects.all()
54 return queryset
56 @extend_schema(
57 responses=CustomSegmentSerializer
58 )
59 def list(self, request):
60 stream_data_projects = StreamDataProject.objects.all()
61 all_groups = {}
63 # Step 1: Build all_groups with zones and child Zones/Segments
64 for stream_data_project in stream_data_projects:
65 for stream_data_entry in stream_data_project.StreamDataEntries.all():
66 zone = stream_data_entry.zone
67 group = stream_data_entry.group
70 # Prepare Segment nodes as child nodes
71 segments = Segment.objects.filter(
72 stream_data_entry=stream_data_entry)
73 segment_children = [{
74 "id": segment.id,
75 "name": segment.name,
76 "type": "Segment",
77 "custom": segment.stream_data_entry.custom,
78 "t_supply": segment.t_supply,
79 "t_target": segment.t_target,
80 "p_supply": segment.p_supply,
81 "p_target": segment.p_target,
82 "h_supply": segment.h_supply,
83 "h_target": segment.h_target,
84 "heat_flow": segment.heat_flow,
85 "dt_cont": segment.dt_cont,
86 "htc": segment.htc,
87 "area": segment.area,
88 "stream_data_entry": segment.stream_data_entry.id,
89 "hen_node": segment.hen_node.id,
90 "children": [] # Segments don't have children
91 } for segment in segments]
93 if zone not in all_groups: 93 ↛ 101line 93 didn't jump to line 101 because the condition on line 93 was always true
94 all_groups[zone] = {
95 "group": group,
96 "stream_data_entry": stream_data_entry,
97 "children": [],
98 }
100 # Add segments to the current zone's children
101 all_groups[zone]["children"].extend(segment_children)
103 # Step 2: Organize into a tree
104 # Need to review the parent groups creation here, it's a messy temporary fix but likely to cause problems longterm.
105 root_node = None
106 parent_groups = {}
107 for zone_name, zone_data in all_groups.items():
108 group = zone_data["group"]
109 parent_group = group.get_parent_group()
110 if parent_group:
111 parent_zone = parent_group.simulationObject.componentName
112 if parent_zone in all_groups: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true
113 all_groups[parent_zone]["children"].append(zone_data)
114 elif parent_zone in parent_groups:
115 parent_groups[parent_zone]["children"].append(zone_data)
116 else:
117 parent_groups[parent_zone] = {
118 "group": parent_group,
119 "children":[],
120 }
121 parent_groups[parent_zone]["children"].append(zone_data)
122 root_node = parent_groups[parent_zone]
123 else:
124 root_node = zone_data
126 # Step 3: Format the output recursively
127 def clean_node(node):
128 # If it's a Segment node, it's already cleaned
129 if node.get("type") == "Segment" or node.get("type") == "CustomSegment":
130 return node
131 return {
132 "name": f'{node["group"].simulationObject.componentName} ({node["group"].abstractionType})',
133 "type": node["group"].abstractionType,
134 "custom": False,
135 "t_supply": None,
136 "t_target": None,
137 "p_supply": None,
138 "p_target": None,
139 "h_supply": None,
140 "h_target": None,
141 "heat_flow": None,
142 "dt_cont": None,
143 "htc": None,
144 "children": [clean_node(child) for child in node["children"]],
145 }
147 if root_node is None: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 return Response(data={})
150 cleaned = clean_node(root_node)
151 return Response(CustomSegmentSerializer(cleaned).data)
153 @extend_schema(request=CreateSegmentSerializer, responses=None)
154 @action(methods=['post'], detail=False, url_path='create-new-segment')
155 def create_segment(self, request):
156 serializer = CreateSegmentSerializer(data=request.data)
157 serializer.is_valid(raise_exception=True)
159 flowsheet = request.query_params.get("flowsheet")
160 flowsheet = Flowsheet.objects.get(pk=flowsheet)
161 parentZone = serializer.validated_data.pop("parentZone")
162 parentGroup = Grouping.objects.get(
163 simulationObject__componentName=parentZone
164 )
166 group = parentGroup
168 stream_data_entry = StreamDataEntry.objects.create(
169 flowsheet=flowsheet,
170 custom=True,
171 group=group,
172 streamDataProject=flowsheet.StreamDataProject,
173 )
175 segment = Segment.objects.create(
176 stream_data_entry=stream_data_entry,
177 **serializer.validated_data
178 )
179 segment.area = segment._calc_area()
180 segment.save(update_fields=["area"])
182 return Response(SegmentSerializer(segment).data, status=201)
185 @extend_schema(request=BulkCreateStreamsSerializer, responses=None)
186 @action(methods=['post'], detail=False, url_path='bulk-create')
187 def bulk_create(self, request):
188 try:
189 serializer = BulkCreateStreamsSerializer(data=request.data)
190 serializer.is_valid(raise_exception=True)
191 validated_data = serializer.validated_data
192 streamZones = []
193 flowsheetID = request.query_params.get("flowsheet")
194 flowsheet = Flowsheet.objects.get(pk=flowsheetID)
195 stream_objects = []
196 streams = validated_data.get("streams")
197 for stream in streams:
198 if stream["parentZone"] not in streamZones:
199 streamZones.append(stream["parentZone"])
200 for zone in streamZones:
201 if not Grouping.objects.filter(simulationObject__componentName=zone).exists():
202 #This should probably be handled by a bulk create within the upload process for open pinch.
203 parentGroup = Grouping.create(
204 flowsheet=flowsheet,
205 group=Grouping.objects.get(simulationObject__componentName="Flowsheet"),
206 componentName=zone,
207 visible=False
208 )
209 parentGroup.abstractionType = AbstractionType.Zone
210 parentGroup.save()
211 else:
212 parentGroup = Grouping.objects.get(
213 simulationObject__componentName=zone
214 )
215 stream_data_entry = StreamDataEntry.objects.create(
216 flowsheet=flowsheet,
217 custom=True,
218 group=parentGroup,
219 streamDataProject=flowsheet.StreamDataProject,
220 )
221 zoneStreams = [stream for stream in streams if stream["parentZone"]==zone]
222 for stream in zoneStreams:
223 del stream["parentZone"]
224 segment = Segment(flowsheet=flowsheet, stream_data_entry=stream_data_entry, **stream)
225 segment.area = segment._calc_area()
226 stream_objects.append(segment)
227 streams.remove(stream)
229 Segment.objects.bulk_create(stream_objects)
231 return Response({'status': 'success'}, status=201)
232 except Exception as e:
233 return self.error_response(e)
235 @action(methods=['post'], detail=False, url_path='delete-all')
236 def delete_all(self, request):
237 try:
238 flowsheet = request.query_params.get("flowsheet")
239 flowsheet = Flowsheet.objects.get(pk=flowsheet)
240 project = flowsheet.StreamDataProject
242 project.Inputs.PinchUtilities.all().delete()
244 return Response({'status': 'success'}, status=204)
245 except Exception as e:
246 return self.error_response(e)
248 def error_response(self, e):
249 tb_info = traceback.format_exc()
250 error_message = str(e)
251 response_data = {'status': 'error',
252 'message': error_message, 'traceback': tb_info}
253 return Response(response_data, status=400)