Coverage for backend/PinchAnalysis/views/SegmentViewSet.py: 32%
143 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from core.auxiliary.enums.generalEnums import AbstractionType
2from core.viewset import ModelViewSet
3from PinchAnalysis.models.InputModels import PinchInputs, Segment, StreamDataEntry
4from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
5from rest_framework.response import Response
6from rest_framework import serializers
7from rest_framework.decorators import action
8import traceback
9from PinchAnalysis.models.StreamDataProject import StreamDataProject
10from flowsheetInternals.graphicData.models.groupingModel import Grouping
11from PinchAnalysis.serializers.PinchInputSerializers import SegmentSerializer
12from core.auxiliary.models.Flowsheet import Flowsheet
14class BulkCreateStreamsSerializer(serializers.Serializer):
15 projectID = serializers.IntegerField(required=True)
16 streams = serializers.ListField(
17 child=serializers.DictField(),
18 required=True,
19 )
22class DeleteAllStreamsSerializer(serializers.Serializer):
23 projectID = serializers.IntegerField(required=True)
26class CustomSegmentSerializer(serializers.Serializer):
27 id = serializers.IntegerField(allow_null=True)
28 name = serializers.CharField()
29 custom = serializers.BooleanField()
30 type = serializers.CharField()
31 t_supply = serializers.FloatField(allow_null=True)
32 t_target = serializers.FloatField(allow_null=True)
33 p_supply = serializers.FloatField(allow_null=True)
34 p_target = serializers.FloatField(allow_null=True)
35 h_supply = serializers.FloatField(allow_null=True)
36 h_target = serializers.FloatField(allow_null=True)
37 heat_flow = serializers.FloatField(allow_null=True)
38 dt_cont = serializers.FloatField(allow_null=True)
39 htc = serializers.FloatField(allow_null=True)
40 children = serializers.ListField(child=serializers.DictField())
42class CreateSegmentSerializer(serializers.Serializer):
43 name = serializers.CharField(required=True)
44 t_supply = serializers.FloatField(required=True)
45 t_target = serializers.FloatField(required=True)
46 heat_flow = serializers.FloatField(required=True)
47 dt_cont = serializers.FloatField(required=True)
48 htc = serializers.FloatField(required=True)
49 parentZone = serializers.CharField(required=True)
51class SegmentViewSet(ModelViewSet):
52 serializer_class = SegmentSerializer
54 def get_queryset(self):
55 queryset = Segment.objects.all()
56 return queryset
58 @extend_schema(
59 responses=CustomSegmentSerializer
60 )
61 def list(self, request):
62 stream_data_projects = StreamDataProject.objects.all()
63 all_groups = {}
65 # Step 1: Build all_groups with zones and child Zones/Segments
66 for stream_data_project in stream_data_projects:
67 for stream_data_entry in stream_data_project.StreamDataEntries.all():
68 zone = stream_data_entry.zone
69 group = stream_data_entry.group
71 # Prepare Segment nodes as child nodes
72 segments = Segment.objects.filter(
73 stream_data_entry=stream_data_entry)
74 segment_children = [{
75 "id": segment.id,
76 "name": segment.name,
77 "type": "Segment",
78 "custom": segment.stream_data_entry.custom,
79 "t_supply": segment.t_supply,
80 "t_target": segment.t_target,
81 "p_supply": segment.p_supply,
82 "p_target": segment.p_target,
83 "h_supply": segment.h_supply,
84 "h_target": segment.h_target,
85 "heat_flow": segment.heat_flow,
86 "dt_cont": segment.dt_cont,
87 "htc": segment.htc,
88 "children": [] # Segments don't have children
89 } for segment in segments]
91 if zone not in all_groups:
92 all_groups[zone] = {
93 "group": group,
94 "stream_data_entry": stream_data_entry,
95 "children": [],
96 }
98 # Add segments to the current zone's children
99 all_groups[zone]["children"].extend(segment_children)
101 # Step 2: Organize into a tree
102 # Need to review the parent groups creation here, it's a messy temporary fix but likely to cause problems longterm.
103 root_node = None
104 parent_groups = {}
105 for zone_name, zone_data in all_groups.items():
106 group = zone_data["group"]
107 parent_group = group.get_parent_group()
108 if parent_group:
109 parent_zone = parent_group.simulationObject.componentName
110 if parent_zone in all_groups:
111 all_groups[parent_zone]["children"].append(zone_data)
112 elif parent_zone in parent_groups:
113 parent_groups[parent_zone]["children"].append(zone_data)
114 else:
115 parent_groups[parent_zone] = {
116 "group": parent_group,
117 "children":[],
118 }
119 parent_groups[parent_zone]["children"].append(zone_data)
120 root_node = parent_groups[parent_zone]
121 else:
122 root_node = zone_data
124 # Step 3: Format the output recursively
125 def clean_node(node):
126 # If it's a Segment node, it's already cleaned
127 if node.get("type") == "Segment" or node.get("type") == "CustomSegment":
128 return node
129 return {
130 "name": f'{node["group"].simulationObject.componentName} ({node["group"].abstractionType})',
131 "type": node["group"].abstractionType,
132 "custom": False,
133 "t_supply": None,
134 "t_target": None,
135 "p_supply": None,
136 "p_target": None,
137 "h_supply": None,
138 "h_target": None,
139 "heat_flow": None,
140 "dt_cont": None,
141 "htc": None,
142 "children": [clean_node(child) for child in node["children"]],
143 }
145 if root_node is None:
146 return Response(data={})
148 cleaned = clean_node(root_node)
149 return Response(CustomSegmentSerializer(cleaned).data)
151 @extend_schema(request=CreateSegmentSerializer, responses=None)
152 @action(methods=['post'], detail=False, url_path='create-new-segment')
153 def create_segment(self, request):
154 serializer = CreateSegmentSerializer(data=request.data)
155 serializer.is_valid(raise_exception=True)
157 flowsheet = request.query_params.get("flowsheet")
158 flowsheet = Flowsheet.objects.get(pk=flowsheet)
159 parentZone = serializer.validated_data.pop("parentZone")
160 parentGroup = Grouping.objects.get(
161 simulationObject__componentName=parentZone
162 )
164 group = parentGroup
166 stream_data_entry = StreamDataEntry.objects.create(
167 flowsheet=flowsheet,
168 custom=True,
169 group=group,
170 streamDataProject=flowsheet.StreamDataProject,
171 )
173 segment = Segment.objects.create(
174 stream_data_entry=stream_data_entry,
175 **serializer.validated_data
176 )
178 return Response(SegmentSerializer(segment).data, status=201)
181 @extend_schema(request=BulkCreateStreamsSerializer, responses=None)
182 @action(methods=['post'], detail=False, url_path='bulk-create')
183 def bulk_create(self, request):
184 try:
185 serializer = BulkCreateStreamsSerializer(data=request.data)
186 serializer.is_valid(raise_exception=True)
187 validated_data = serializer.validated_data
188 streamZones = []
189 flowsheetID = request.query_params.get("flowsheet")
190 flowsheet = Flowsheet.objects.get(pk=flowsheetID)
191 stream_objects = []
192 streams = validated_data.get("streams")
193 for stream in streams:
194 if stream["parentZone"] not in streamZones:
195 streamZones.append(stream["parentZone"])
196 for zone in streamZones:
197 if not Grouping.objects.filter(simulationObject__componentName=zone).exists():
198 #This should probably be handled by a bulk create within the upload process for open pinch.
199 parentGroup = Grouping.create(
200 flowsheet=flowsheet,
201 group=Grouping.objects.get(simulationObject__componentName="Flowsheet"),
202 componentName=zone,
203 visible=False
204 )
205 parentGroup.abstractionType = AbstractionType.Zone
206 parentGroup.save()
207 else:
208 parentGroup = Grouping.objects.get(
209 simulationObject__componentName=zone
210 )
211 stream_data_entry = StreamDataEntry.objects.create(
212 flowsheet=flowsheet,
213 custom=True,
214 group=parentGroup,
215 streamDataProject=flowsheet.StreamDataProject,
216 )
217 zoneStreams = [stream for stream in streams if stream["parentZone"]==zone]
218 for stream in zoneStreams:
219 del stream["parentZone"]
220 stream_objects.append(Segment(flowsheet_id=flowsheet, stream_data_entry=stream_data_entry, **stream))
221 streams.remove(stream)
223 Segment.objects.bulk_create(stream_objects)
225 return Response({'status': 'success'}, status=201)
226 except Exception as e:
227 return self.error_response(e)
229 @extend_schema(request=DeleteAllStreamsSerializer, responses=None)
230 @action(methods=['post'], detail=False, url_path='delete-all')
231 def delete_all(self, request):
232 try:
233 serializer = DeleteAllStreamsSerializer(data=request.data)
234 serializer.is_valid(raise_exception=True)
235 validated_data = serializer.validated_data
237 projectID = validated_data.get("projectID")
238 project = StreamDataProject.objects.get(pk=projectID)
240 project.Inputs.Segments.all().delete()
242 return Response({'status': 'success'}, status=204)
243 except Exception as e:
244 return self.error_response(e)
246 def error_response(self, e):
247 tb_info = traceback.format_exc()
248 error_message = str(e)
249 response_data = {'status': 'error',
250 'message': error_message, 'traceback': tb_info}
251 return Response(response_data, status=400)