Coverage for backend/core/auxiliary/viewsets/PropertySetViewSet.py: 53%
180 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import traceback
2from core.auxiliary.enums.unitOpGraphics import ConType
3from core.viewset import ModelViewSet
4from rest_framework.decorators import action
5from rest_framework.response import Response
6from rest_framework import serializers
7from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
9from ..serializers.PropertyInfoSerializer import PropertySetSerializer
10from ..models.PropertySet import PropertySet
11from ..models.PropertyInfo import PropertyInfo
12from ..models.PropertyValue import PropertyValue
13from core.auxiliary.enums.uiEnums import CompoundMode
14from core.auxiliary.enums.unitOpData import turbine_configs, hx_configs
15from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
16from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
17from flowsheetInternals.unitops.config.config_base import configuration
19from .compound_conversions import compound_db_to_molar_flow, serialize_to_current_mode
21class UpdateCompoundsSerializer(serializers.Serializer):
22 propertySet = serializers.IntegerField(required=True) # pk of property set
23 compounds = serializers.ListField(required=True)
25class UpdateCompoundModeSerializer(serializers.Serializer):
26 compoundMode = serializers.ChoiceField(choices=CompoundMode.choices , required=True)
28class ResetPropertyInfoValuesSerializer(serializers.Serializer):
29 property_set = serializers.IntegerField(required=True)
31class UpdateTurbineRequest(serializers.Serializer):
32 turbine_type = serializers.CharField(required=True)
33 simulation_object = serializers.IntegerField(required=True)
35class UpdateHXRequest(serializers.Serializer):
36 hx_type = serializers.CharField(required=True)
37 simulation_object = serializers.IntegerField(required=True)
39class SchemaPropertySetViewSet(ModelViewSet):
40 serializer_class = PropertySetSerializer
42 def get_queryset(self):
43 queryset = PropertySet.objects.all().prefetch_related("ContainedProperties__values__indexedItems")
44 return queryset
47 @extend_schema(
48 parameters=[
49 OpenApiParameter(name="id", required=True, type=OpenApiTypes.INT),
50 ],
51 request=UpdateCompoundModeSerializer,
52 )
53 @action(detail=False, methods=['PUT'])
54 def update_compound_mode(self, request):
55 # TODO: refactor to remove this to partial update
56 id = self.request.query_params.get("id")
57 propertySet = PropertySet.objects.get(id=id)
58 compoundMode = self.request.data["compoundMode"]
59 propertySet.compoundMode = compoundMode
60 propertySet.save()
61 # update display values
62 # if compoundMode is "MolarFraction":
63 if compoundMode == "MolarFraction":
64 # Copy raw values to display values
65 mole_frac_comp = propertySet.get_property("mole_frac_comp")
66 property_values = mole_frac_comp.values.all()
68 # copy values to display values
69 for prop in property_values:
70 prop.displayValue = prop.value
72 # Update in database
73 PropertyValue.objects.bulk_update(property_values, ["displayValue"])
75 # if compoundMode is "MassFraction":
76 elif compoundMode == "MassFraction":
77 # Convert molar fractions to mass fractions for display
78 from .compound_conversions import update_fraction_display_values
79 update_fraction_display_values(propertySet)
81 return Response({"compoundMode": compoundMode})
85 @extend_schema(
86 request=None
87 )
88 @action(detail=True, methods=['POST'])
89 def normalize_compound_values(self, request, pk=None):
90 try:
91 property_set = PropertySet.objects.get(id=pk)
92 mole_frac_comp = property_set.get_property("mole_frac_comp")
93 compoundMode = property_set.compoundMode
95 value_objs = mole_frac_comp.values.all()
96 sum_frac_comp = sum([float(prop.value) if prop.value not in [None, ""] else 0 for prop in value_objs])
98 properties_schema = {}
99 value_objects = {}
101 ## this only works for mole fractions, since mass fractions need to have converted equivalent molar frac values.
102 ## so we need to pass in compoundMode,
103 def normalise_fractions():
104 for prop in value_objs:
105 if prop.displayValue in [None, "", 0]:
106 raise Exception(f"Please specify all values for compounds, or remove them from the stream.")
107 else:
108 prop.displayValue = str(float(prop.displayValue) / sum_frac_comp)
110 if compoundMode == "MolarFraction":
111 if prop.displayValue in [None, "", 0]: 111 ↛ 115line 111 didn't jump to line 115 because the condition on line 111 was never true
112 # this will work one day. But currently, allowing this gives an infeasible error on solve.
113 # prop.value = 10e-11
114 # prop.displayValue = 0
115 raise Exception(f"Please specify all values for compounds, or remove them from the stream.")
116 else:
117 prop.value = prop.displayValue
118 else:
119 # we are in mass fraction basis.
120 key = prop.get_index("compound").key
121 value_objects[key] = prop
122 if prop.displayValue not in [None, "", 0]: 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true
123 prop.value = compound_db_to_molar_flow(prop.get_index("compound").key, float(prop.displayValue))
124 else:
125 raise Exception(f"Please specify all values for compounds, or remove them from the stream.")
126 properties_schema[key] = float(prop.value)
128 def mass_frac_to_molar_frac(properties_schema):
129 sum_mass_frac = sum(prop for prop in properties_schema.values())
130 for prop in value_objs:
131 key = prop.get_index("compound").key
132 value_objects[key] = prop
133 if properties_schema[key] not in [None, ""]: 133 ↛ 135line 133 didn't jump to line 135 because the condition on line 133 was always true
134 prop.value = float(properties_schema[key]) / float(sum_mass_frac)
135 if properties_schema[key] in [None, "", 0]: 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was never true
136 # prop.value = 10e-11
137 # prop.displayValue = 0
138 raise Exception(f"Please specify all values for compounds, or remove them from the stream.")
140 match compoundMode:
141 case "MolarFraction":
142 normalise_fractions()
143 case "MassFraction": 143 ↛ 147line 143 didn't jump to line 147 because the pattern on line 143 always matched
144 normalise_fractions()
145 mass_frac_to_molar_frac(properties_schema)
147 PropertyValue.objects.bulk_update(value_objs, ["value", "displayValue"])
149 return Response({
150 "message": "Compound values normalized successfully",
151 })
152 except Exception as e:
153 return self.error_response(e)
155 @extend_schema(request=UpdateTurbineRequest, responses=None)
156 @action(detail=False, methods=['POST'], url_path='update-turbine-type')
157 def update_turbine_type(self, request):
158 serializer = UpdateTurbineRequest(data=request.data)
159 serializer.is_valid(raise_exception=True)
160 validated_data = serializer.validated_data
161 turbine_type = validated_data.get("turbine_type")
162 simulation_object = validated_data.get("simulation_object")
163 if turbine_type not in turbine_configs:
164 return Response("Turbine type is not valid", status=400)
167 simulation_object = SimulationObject.objects.get(id=simulation_object)
168 old_graphic = simulation_object.graphicObject.first()
169 new_turbine = self.create_simulation_object(simulation_object,turbine_type)
171 new_inlet_port = new_turbine.ports.get(direction= ConType.Inlet)
172 new_outlet_ports = new_turbine.ports.filter(direction=ConType.Outlet)
173 # new_power_outlet = new_turbine.get_port("power_outlet")
175 old_inlet_port = simulation_object.ports.get(direction= ConType.Inlet)
176 new_inlet_port.stream = old_inlet_port.stream
177 new_inlet_port.save()
179 old_outlet_ports = simulation_object.ports.filter(direction=ConType.Outlet)
180 for old_port in old_outlet_ports:
181 new_port = new_outlet_ports.get(key=old_port.key)
182 new_port.stream = old_port.stream
183 new_port.save()
186 new_graphic = new_turbine.graphicObject.first()
187 new_graphic.x = old_graphic.x
188 new_graphic.y = old_graphic.y
189 new_graphic.save()
191 simulation_object.permanently_delete()
192 return Response(new_turbine.id, status=200)
194 @extend_schema(request=UpdateHXRequest, responses=None)
195 @action(detail=False, methods=['POST'], url_path='update-hx-type')
196 def update_hx_type(self, request):
197 serializer = UpdateHXRequest(data=request.data)
198 serializer.is_valid(raise_exception=True)
199 validated_data = serializer.validated_data
200 hx_type = validated_data.get("hx_type")
201 simulation_object = validated_data.get("simulation_object")
202 if hx_type not in hx_configs:
203 return Response("hx type is not valid", status=400)
205 simulation_object = SimulationObject.objects.get(id=simulation_object)
206 old_graphic = simulation_object.graphicObject.first()
207 new_hx = self.create_simulation_object(simulation_object, hx_type)
209 new_inlet_ports = new_hx.ports.filter(direction= ConType.Inlet)
210 new_outlet_ports = new_hx.ports.filter(direction=ConType.Outlet)
211 old_inlet_ports = simulation_object.ports.filter(direction= ConType.Inlet)
212 old_outlet_ports = simulation_object.ports.filter(direction=ConType.Outlet)
214 for old_port in old_inlet_ports:
215 new_port = new_inlet_ports.get(key=old_port.key)
216 new_port.stream = old_port.stream
217 new_port.save()
219 for old_port in old_outlet_ports:
220 new_port = new_outlet_ports.get(key=old_port.key)
221 new_port.stream = old_port.stream
222 new_port.save()
224 new_graphic = new_hx.graphicObject.first()
225 new_graphic.x = old_graphic.x
226 new_graphic.y = old_graphic.y
227 new_graphic.save()
229 simulation_object.permanently_delete()
230 return Response(new_hx.id, status=200)
233 def create_simulation_object(self, simulation_object, obj_type):
234 parentGroup = simulation_object.get_group()
235 object_schema = configuration[obj_type]
236 factory = SimulationObjectFactory()
237 new_object = factory.create(
238 object_type=obj_type,
239 object_schema=object_schema,
240 coordinates={"x": 0, "y": 0},
241 flowsheet=simulation_object.flowsheet,
242 componentName = simulation_object.componentName,
243 parentGroup=parentGroup,
244 )
245 factory.perform_bulk_create()
246 return new_object
248 def error_response(self, e):
249 tb_info = traceback.format_exc()
250 error_message = str(e)
251 response_data = {'status': 'error', 'message': error_message, 'traceback': tb_info}
252 return Response(response_data, status=400)
255 def retrieve(self, request, *args, **kwargs):
256 property_set = self.get_object()
257 simulation_object = property_set.simulationObject
259 if simulation_object.is_stream(): 259 ↛ 265line 259 didn't jump to line 265 because the condition on line 259 was always true
260 serializer = self.get_serializer(property_set)
261 properties_schema = serializer.data["ContainedProperties"]
262 serialize_to_current_mode(property_set, properties_schema)
263 return Response(serializer.data)
265 serializer = self.get_serializer(property_set)
267 return Response(serializer.data)