Coverage for backend/django/idaes_factory/adapters/stream_properties.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-26 20:57 +0000

1from ahuora_builder_types import PropertiesSchema 

2from flowsheetInternals.unitops.models import SimulationObject 

3from .property_info_adapter import SerialisePropertiesAdapter 

4from ..queryset_lookup import get_property 

5 

6 

7""" 

8Stream serialisation using SerialisePropertiesAdapter. 

9Handles material streams, energy streams, and other stream types. 

10""" 

11 

12_stream_adapter = SerialisePropertiesAdapter() 

13 

14def should_serialise_stream(stream: SimulationObject, is_inlet: bool) -> bool: 

15 """ 

16 Check if a stream should be serialised. 

17 """ 

18 if stream.connectedPorts.count() == 1: 

19 # ie. inlet or outlet port 

20 return True 

21 if stream.has_recycle_connection: 

22 # serialise both the inlet (for guesses) and outlet (for set points) 

23 return True 

24 else: 

25 # serialise the outlet for intermediate ports 

26 return not is_inlet 

27 

28 

29def serialise_stream(ctx, stream: SimulationObject, is_inlet: bool) -> PropertiesSchema: 

30 """ 

31 Serialise the properties of a material stream. 

32 """ 

33 if not should_serialise_stream(stream, is_inlet): 

34 return {} 

35 

36 tear_inlet = stream.has_recycle_connection and is_inlet 

37 tear_outlet = stream.has_recycle_connection and not is_inlet 

38 

39 # Serialize all properties with the correct is_tear flag 

40 result = _stream_adapter.serialise(ctx, stream, is_tear=tear_inlet) 

41 

42 # For tear outlets, filter out properties that are not set points 

43 if tear_outlet: 

44 for prop in stream.properties.containedProperties.all(): 

45 for value in prop.values.all(): 

46 if not value.is_control_set_point() or not value.is_externally_controlled(): 

47 result.pop(prop.key, None) 

48 break # Exit values loop once we decide to remove this property 

49 

50 return result