Coverage for backend/django/core/auxiliary/models/Flowsheet.py: 96%
85 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from django.db import models
2from django.db.models import Prefetch, QuerySet
3from django.utils import timezone
5from CoreRoot import settings
6from core.auxiliary.enums.FlowsheetTemplateType import FlowsheetTemplateType
7from authentication.user.models import User
8from authentication.user.AccessTable import AccessTable
9import core.auxiliary.enums.ViewType as ViewType
12class Flowsheet(models.Model):
13 name = models.CharField(max_length=64, default="Flowsheet")
14 buildVersion = models.CharField(max_length=32, default=None, null=True)
15 buildDate = models.CharField(max_length=32, default=None, null=True)
16 savedDate = models.DateTimeField(null=True)
17 # objectCounter = models.IntegerField(default=0)
18 rootGrouping = models.ForeignKey("flowsheetInternals_graphicData.Grouping", default=None, null=True, on_delete=models.CASCADE, related_name="flowsheets")
19 owner = models.ForeignKey(User, on_delete=models.CASCADE, related_name="flowsheets", null=True)
20 flowsheet_template_type = models.CharField(max_length=32, choices=FlowsheetTemplateType.choices , default=FlowsheetTemplateType.NotTemplate)
21 created_at = models.DateTimeField(auto_now_add=True)
22 binned_at = models.DateTimeField(null=True)
23 is_starred = models.BooleanField(default=False);
24 is_binned = models.BooleanField(default=False);
26 @classmethod
27 def create(cls, **kwargs):
28 from flowsheetInternals.graphicData.models.groupingModel import Grouping
29 from PinchAnalysis.models.StreamDataProject import StreamDataProject
31 project_number = Flowsheet.objects.count() + 1
32 saved_date = timezone.now()
34 field_values = {
35 'name': kwargs.get('name', f'Project-{project_number}'),
36 'buildVersion': kwargs.get('buildVersion', 'Not set'),
37 'buildDate': kwargs.get('buildDate', 'No Build Date Set'),
38 'savedDate': saved_date,
39 'owner': kwargs.get('owner')
40 }
42 instance = Flowsheet.objects.create(**field_values)
44 AccessTable.objects.create(
45 user=instance.owner,
46 flowsheet=instance,
47 read_only=False,
48 )
50 # create the root group
51 rootGroup = Grouping.create(instance, group=None, componentName="Flowsheet", visible=True, isRoot=True)
53 instance.rootGrouping = rootGroup
54 instance.save()
56 StreamDataProject.create(rootGroup, flowsheet=instance)
57 return instance
59 def set_saved_date(self):
60 self.savedDate = timezone.now()
61 self.save()
64 @classmethod
65 def with_user_access(cls, queryset: QuerySet, user: User) -> QuerySet:
66 if user is None or not user.is_authenticated: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 return queryset
69 return queryset.prefetch_related(
70 Prefetch(
71 "access_list",
72 queryset=AccessTable.objects.filter(user=user),
73 to_attr="current_user_access_entries",
74 )
75 )
77 @classmethod
78 def share_flowsheet(cls, owner, flowsheet_id, user_email, read_only=False):
79 """
80 Share a flowsheet with a user by their email address.
81 """
82 flowsheet = cls.objects.get(id=flowsheet_id, owner=owner)
83 user = User.objects.get(email=user_email)
85 if user.email == owner.email:
86 raise ValueError("You cannot share a flowsheet with yourself.")
88 if user.email == settings.PLATFORM_TEST_EMAIL and not owner.is_tester:
89 raise PermissionError("Only testers can share flowsheets with the platform test account.")
91 if AccessTable.objects.filter(user=user, flowsheet=flowsheet).exists():
92 raise ValueError("This user already has access to the flowsheet.")
94 AccessTable.objects.create(user=user, flowsheet=flowsheet, read_only=read_only)
96 @classmethod
97 def update_shared_user_access(cls, owner, flowsheet_id, user_email, read_only):
98 flowsheet = cls.objects.get(id=flowsheet_id, owner=owner)
99 user = User.objects.get(email=user_email)
101 if user.id == owner.id:
102 raise ValueError("Cannot modify the owner's access.")
104 access_row = AccessTable.objects.get(user=user, flowsheet=flowsheet)
105 access_row.read_only = read_only
106 access_row.save(update_fields=["read_only"])
108 return access_row
110 @classmethod
111 def get_flowsheets_by_view_type(cls, user, view_type):
112 # filter to exclude templates
113 base_query = cls.objects.filter(flowsheet_template_type=FlowsheetTemplateType.NotTemplate)
115 if view_type == ViewType.ALL:
116 return base_query.filter(access_list__user=user)
117 elif view_type == ViewType.SHARED:
118 return base_query.filter(access_list__user=user).exclude(owner=user)
119 elif view_type == ViewType.OWNED:
120 return base_query.filter(owner=user)
121 else:
122 return base_query.filter(owner=user)
124 @classmethod
125 def get_shared_users(cls, owner, flowsheet_id):
126 flowsheet = cls.objects.get(id=flowsheet_id, owner=owner)
127 shared_users = (
128 AccessTable.objects
129 .filter(flowsheet=flowsheet)
130 .exclude(user=owner)
131 .values("user__email", "read_only")
132 .order_by("user__email")
133 )
134 return [
135 {"email": row["user__email"], "read_only": row["read_only"]}
136 for row in shared_users
137 ]
139 @classmethod
140 def remove_user(cls, owner, flowsheet_id, user_email):
141 """
142 Remove a user from the access list of the flowsheet.
143 """
144 flowsheet = cls.objects.get(id=flowsheet_id, owner=owner)
145 user = User.objects.get(email=user_email)
146 if user.id == owner.id: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 raise ValueError("Cannot remove the owner of the flowsheet.")
149 AccessTable.objects.get(user=user, flowsheet=flowsheet).delete()