Coverage for backend/django/PinchAnalysis/views/SegmentViewSet.py: 65%
145 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from PinchAnalysis.models.HenNode import HenNode
2from core.auxiliary.views.ExtractSegmentDataFromFS import _calc_area
3from core.auxiliary.enums.generalEnums import AbstractionType
4from core.viewset import ModelViewSet
5from PinchAnalysis.models.InputModels import PinchInputs, Segment, StreamDataEntry
6from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
7from rest_framework.response import Response
8from rest_framework import serializers
9from rest_framework.decorators import action
10import traceback
11from PinchAnalysis.models.StreamDataProject import StreamDataProject
12from flowsheetInternals.graphicData.models.groupingModel import Grouping
13from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer
14from core.auxiliary.models.Flowsheet import Flowsheet
16class BulkCreateStreamsSerializer(serializers.Serializer):
17 projectID = serializers.IntegerField(required=True)
18 streams = serializers.ListField(
19 child=serializers.DictField(),
20 required=True,
21 )
23class CustomSegmentSerializer(serializers.Serializer):
24 id = serializers.IntegerField(allow_null=True)
25 name = serializers.CharField()
26 custom = serializers.BooleanField()
27 type = serializers.CharField()
28 t_supply = serializers.FloatField(allow_null=True)
29 t_target = serializers.FloatField(allow_null=True)
30 p_supply = serializers.FloatField(allow_null=True)
31 p_target = serializers.FloatField(allow_null=True)
32 h_supply = serializers.FloatField(allow_null=True)
33 h_target = serializers.FloatField(allow_null=True)
34 heat_flow = serializers.FloatField(allow_null=True)
35 dt_cont = serializers.FloatField(allow_null=True)
36 htc = serializers.FloatField(allow_null=True)
37 area = serializers.FloatField(allow_null=True)
38 children = serializers.ListField(child=serializers.DictField())
40class CreateSegmentSerializer(serializers.Serializer):
41 name = serializers.CharField(required=True)
42 t_supply = serializers.FloatField(required=True)
43 t_target = serializers.FloatField(required=True)
44 heat_flow = serializers.FloatField(required=True)
45 dt_cont = serializers.FloatField(required=True)
46 htc = serializers.FloatField(required=True)
47 parentZone = serializers.CharField(required=True)
49class SegmentViewSet(ModelViewSet):
50 serializer_class = SegmentSerializer
52 def get_queryset(self):
53 queryset = Segment.objects.all()
54 return queryset
56 @extend_schema(
57 responses=CustomSegmentSerializer
58 )
59 def list(self, request):
60 stream_data_projects = StreamDataProject.objects.all()
61 all_groups = {}
63 # Step 1: Build all_groups with zones and child Zones/Segments
64 for stream_data_project in stream_data_projects:
65 for stream_data_entry in stream_data_project.StreamDataEntries.all():
66 zone = stream_data_entry.zone
67 group = stream_data_entry.group
70 # Prepare Segment nodes as child nodes
71 segments = Segment.objects.filter(
72 stream_data_entry=stream_data_entry)
73 segment_children = [{
74 "id": segment.id,
75 "name": segment.name,
76 "type": "Segment",
77 "custom": segment.stream_data_entry.custom,
78 "t_supply": segment.t_supply,
79 "t_target": segment.t_target,
80 "p_supply": segment.p_supply,
81 "p_target": segment.p_target,
82 "h_supply": segment.h_supply,
83 "h_target": segment.h_target,
84 "heat_flow": segment.heat_flow,
85 "dt_cont": segment.dt_cont,
86 "htc": segment.htc,
87 "area": segment.area,
88 "stream_data_entry": segment.stream_data_entry.id,
89 "hen_node": segment.hen_node.id if segment.hen_node else None,
90 "children": [] # Segments don't have children
91 } for segment in segments]
93 if zone not in all_groups:
94 all_groups[zone] = {
95 "group": group,
96 "stream_data_entry": stream_data_entry,
97 "children": [],
98 }
100 # Add segments to the current zone's children
101 all_groups[zone]["children"].extend(segment_children)
103 # Step 2: Organize into a tree
104 # Need to review the parent groups creation here, it's a messy temporary fix but likely to cause problems longterm.
105 root_node = None
106 parent_groups = {}
107 for zone_name, zone_data in all_groups.items():
108 group = zone_data["group"]
109 parent_group = group.get_parent_group()
110 if parent_group:
111 parent_zone = parent_group.simulationObject.componentName
112 if parent_zone in all_groups: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true
113 all_groups[parent_zone]["children"].append(zone_data)
114 elif parent_zone in parent_groups:
115 parent_groups[parent_zone]["children"].append(zone_data)
116 else:
117 parent_groups[parent_zone] = {
118 "group": parent_group,
119 "children":[],
120 }
121 parent_groups[parent_zone]["children"].append(zone_data)
122 root_node = parent_groups[parent_zone]
123 else:
124 root_node = zone_data
126 # Step 3: Format the output recursively
127 def clean_node(node):
128 # If it's a Segment node, it's already cleaned
129 if node.get("type") == "Segment" or node.get("type") == "CustomSegment":
130 return node
131 return {
132 "name": f'{node["group"].simulationObject.componentName} ({node["group"].abstractionType})',
133 "type": node["group"].abstractionType,
134 "custom": False,
135 "t_supply": None,
136 "t_target": None,
137 "p_supply": None,
138 "p_target": None,
139 "h_supply": None,
140 "h_target": None,
141 "heat_flow": None,
142 "dt_cont": None,
143 "htc": None,
144 "children": [clean_node(child) for child in node["children"]],
145 }
147 if root_node is None:
148 return Response(data={})
150 cleaned = clean_node(root_node)
151 return Response(CustomSegmentSerializer(cleaned).data)
153 @extend_schema(request=CreateSegmentSerializer, responses=None)
154 @action(methods=['post'], detail=False, url_path='create-new-segment')
155 def create_segment(self, request):
156 serializer = CreateSegmentSerializer(data=request.data)
157 serializer.is_valid(raise_exception=True)
159 flowsheet = request.query_params.get("flowsheet")
160 flowsheet = Flowsheet.objects.get(pk=flowsheet)
161 parentZone = serializer.validated_data.pop("parentZone")
162 parentGroup = Grouping.objects.get(
163 simulationObject__componentName=parentZone,
164 flowsheet=flowsheet,
165 )
167 group = parentGroup
169 stream_data_entry = StreamDataEntry.objects.create(
170 flowsheet=flowsheet,
171 custom=True,
172 group=group,
173 streamDataProject=flowsheet.StreamDataProject,
174 )
176 segment = Segment.objects.create(
177 stream_data_entry=stream_data_entry,
178 flowsheet=flowsheet,
179 **serializer.validated_data
180 )
181 segment.area = segment._calc_area()
182 segment.save(update_fields=["area"])
184 return Response(SegmentSerializer(segment).data, status=201)
187 @extend_schema(request=BulkCreateStreamsSerializer, responses=None)
188 @action(methods=['post'], detail=False, url_path='bulk-create')
189 def bulk_create(self, request):
190 try:
191 serializer = BulkCreateStreamsSerializer(data=request.data)
192 serializer.is_valid(raise_exception=True)
193 validated_data = serializer.validated_data
194 streamZones = []
195 flowsheetID = request.query_params.get("flowsheet")
196 flowsheet = Flowsheet.objects.get(pk=flowsheetID)
197 stream_objects = []
198 streams = validated_data.get("streams")
199 for stream in streams:
200 if stream["parentZone"] not in streamZones:
201 streamZones.append(stream["parentZone"])
202 for zone in streamZones:
203 if not Grouping.objects.filter(simulationObject__componentName=zone, flowsheet=flowsheet).exists():
204 #This should probably be handled by a bulk create within the upload process for open pinch.
205 parentGroup = Grouping.create(
206 flowsheet=flowsheet,
207 group=flowsheet.rootGrouping,
208 componentName=zone,
209 visible=False
210 )
211 parentGroup.abstractionType = AbstractionType.Zone
212 parentGroup.save()
213 else:
214 parentGroup = Grouping.objects.get(
215 simulationObject__componentName=zone,
216 flowsheet=flowsheet,
217 )
218 stream_data_entry = StreamDataEntry.objects.create(
219 flowsheet=flowsheet,
220 custom=True,
221 group=parentGroup,
222 streamDataProject=flowsheet.StreamDataProject,
223 )
224 zoneStreams = [stream for stream in streams if stream["parentZone"]==zone]
225 for stream in zoneStreams:
226 del stream["parentZone"]
227 segment = Segment(flowsheet=flowsheet, stream_data_entry=stream_data_entry, **stream)
228 segment.area = segment._calc_area()
229 stream_objects.append(segment)
230 streams.remove(stream)
232 Segment.objects.bulk_create(stream_objects)
234 return Response({'status': 'success'}, status=201)
235 except Exception as e:
236 return self.error_response(e)
238 @action(methods=['post'], detail=False, url_path='delete-all')
239 def delete_all(self, request):
240 try:
241 flowsheet = request.query_params.get("flowsheet")
242 flowsheet = Flowsheet.objects.get(pk=flowsheet)
243 project = flowsheet.StreamDataProject
245 project.Inputs.PinchUtilities.all().delete()
247 return Response({'status': 'success'}, status=204)
248 except Exception as e:
249 return self.error_response(e)
251 def error_response(self, e):
252 tb_info = traceback.format_exc()
253 error_message = str(e)
254 response_data = {'status': 'error',
255 'message': error_message, 'traceback': tb_info}
256 return Response(response_data, status=400)