Coverage for backend/django/core/auxiliary/viewsets/PropertyInfoViewSet.py: 87%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from core.viewset import ModelViewSet, ReadOnlyModelViewSet 

2from rest_framework import serializers 

3from ..serializers.PropertyInfoSerializer import PropertyInfoSerializer, PropertyHistorySerializer 

4from ..models import PropertyInfo, HistoricalValue 

5from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes 

6from rest_framework.decorators import action 

7from rest_framework.response import Response 

8from rest_framework.exceptions import NotFound, ValidationError 

9from core.auxiliary.property_events import ( 

10 property_info_deleted, 

11 property_info_deleting, 

12 property_info_saved, 

13) 

14from core.auxiliary.property_state import reject_property_update 

15 

16class CompoundUpdateSerializer(serializers.Serializer): 

17 basis = serializers.CharField() 

18 flowType = serializers.CharField() 

19 value = serializers.CharField(allow_blank=True) 

20 

21class PropertyInfoViewSet(ModelViewSet): 

22 serializer_class = PropertyInfoSerializer 

23 

24 def get_queryset(self): 

25 return PropertyInfo.objects.all() 

26 

27 def perform_update(self, serializer): 

28 property_info = serializer.save() 

29 property_info_saved(property_info) 

30 

31 def perform_destroy(self, instance): 

32 reject_property_update(instance, {}, action="delete") 

33 property_info_deleting(instance) 

34 response = super().perform_destroy(instance) 

35 property_info_deleted(instance) 

36 return response 

37 

38class HistoryViewSet(ReadOnlyModelViewSet): 

39 serializer_class = PropertyHistorySerializer 

40 

41 @extend_schema( 

42 parameters=[ 

43 OpenApiParameter(name="property", required=True, type=OpenApiTypes.INT), 

44 ] 

45 ) 

46 def get_queryset(self): 

47 # Filter to only the history of a specific property, e.g material stream flowrate 

48 queryset = HistoricalValue.objects.all() 

49 property = self.request.query_params.get("property") 

50 if property is not None: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 queryset = queryset.filter(property=property) 

52 return queryset 

53 

54 

55 @extend_schema( 

56 parameters=[ 

57 OpenApiParameter(name="flowsheet", required=True, type=OpenApiTypes.INT), 

58 ] 

59 ) 

60 @action(detail=False, methods=['delete']) 

61 def delete(self,request): 

62 # Delete all  

63 id = self.request.query_params.get("flowsheet") 

64 HistoricalValue.objects.select_related("property__set__setOwner__id").filter(property__set__setOwner__flowsheet__id=id).delete() 

65 HistoricalValue.objects.select_related("property__set__msSetOwner__id").filter(property__set__msSetOwner__flowsheet__id=id).delete() 

66 return Response(status=204)