Coverage for backend/django/idaes_factory/adapters/arc_adapter.py: 100%
23 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from typing import Optional
3from ahuora_builder_types.arc_schema import TearGuessSchema
4from core.auxiliary.enums import ConType
5from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
6from ahuora_builder_types import ArcSchema, FlowsheetSchema
7from ..idaes_factory_context import IdaesFactoryContext
9from ..queryset_lookup import get_property, get_connected_port, get_value_object
12def create_arc(ctx: IdaesFactoryContext, stream: SimulationObject) -> Optional[ArcSchema]:
13 """Adds a material stream to the flowsheet schema"""
15 from_port = get_connected_port(stream, direction=ConType.Outlet)
16 to_port = get_connected_port(stream, direction=ConType.Inlet)
18 if from_port is None or to_port is None:
19 # no inlet or outlet port, skip this stream
20 return None
22 arc_schema: ArcSchema = ArcSchema(
23 id=stream.pk,
24 source=from_port.pk,
25 destination=to_port.pk
26 )
27 if stream.has_recycle_connection:
28 arc_schema.tear_guess = create_tear(stream)
30 return arc_schema
33def create_tear(stream: SimulationObject) -> dict[str, bool]:
34 """Create a tear for a recycle stream"""
35 # eventually might be nice to use the same keys as in IDAES
36 key_map = {
37 "mole_frac_comp": "mole_frac_comp",
38 "flow_mol": "flow_mol", # because of this one :(
39 "temperature": "temperature",
40 "pressure": "pressure"
41 }
42 tear = {}
44 for prop in stream.properties.ContainedProperties.all():
45 if prop.is_recycle_var():
46 tear[key_map[prop.key]] = prop.recycleConnection.fixed
48 return tear