Coverage for backend/django/diagnostics/models.py: 100%
31 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-12 01:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-12 01:47 +0000
1from __future__ import annotations
3from pydantic import JsonValue
5from django.db import models
7from core.auxiliary.models.Flowsheet import Flowsheet
8from core.auxiliary.models.Task import Task
9from core.managers import AccessControlManager
10from diagnostics.constants import DiagnosticRunStatus, DiagnosticTrigger
13class DiagnosticRun(models.Model):
14 objects = AccessControlManager()
15 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE, related_name="diagnostic_runs")
16 task = models.ForeignKey(Task, null=True, on_delete=models.SET_NULL, related_name="diagnostic_runs")
17 diagnostic_status = models.CharField(max_length=32, choices=DiagnosticRunStatus.choices)
18 trigger = models.CharField(max_length=32, choices=DiagnosticTrigger.choices)
19 summary = models.JSONField(null=True)
20 trace_id = models.CharField(max_length=128, null=True)
21 started_at = models.DateTimeField(null=True)
22 completed_at = models.DateTimeField(null=True)
23 created_at = models.DateTimeField(auto_now_add=True)
24 updated_at = models.DateTimeField(auto_now=True)
26 def append_event(self, event_type: str, data: dict[str, JsonValue]) -> int:
27 """Append an event to this diagnostic run. Returns the event ID."""
28 event = DiagnosticEvent.objects.create(
29 flowsheet=self.flowsheet,
30 run=self,
31 type=event_type,
32 data=data,
33 )
34 return event.id
37class DiagnosticEvent(models.Model):
38 objects = AccessControlManager()
39 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE, related_name="diagnostic_events")
40 run = models.ForeignKey(DiagnosticRun, on_delete=models.CASCADE, related_name="events")
41 ts = models.DateTimeField(auto_now_add=True)
42 type = models.CharField(max_length=64)
43 data = models.JSONField(null=True)
45 class Meta:
46 ordering = ["run_id", "id"]