Coverage for backend/django/core/auxiliary/methods/copy_object/copy_object.py: 92%
125 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from enum import Enum
2from typing import Iterable
4from django.core.exceptions import ObjectDoesNotExist
5from django.db.models import Model
6from django.db.models.fields.reverse_related import ManyToOneRel, OneToOneRel
8from ..copy_flowsheet.copy_caching import ModelLookup, ModelLookupDict
11class BoundSource:
12 """
13 A source model plus the relationship that led us to it.
15 "Bound" here means the source is not just an arbitrary model instance:
16 it is tied to a copied parent via `parent_field_name`, so when we clone
17 the source we know which forward field should point at the copied parent
18 rather than the original parent.
19 """
20 def __init__(self, source: Model, parent_source: Model = None, parent_field_name: str = None):
21 self.source = source
22 self.parent_source = parent_source
23 self.parent_field_name = parent_field_name
26class RemapOrRetainMode(Enum):
27 """
28 Policies for forward relations that are resolved after the copied subset exists.
30 These modes are used by `forward_relation_policy`.
31 """
33 COPY_IF_SELECTED_ELSE_KEEP = "copy_if_selected_else_keep"
34 COPY_IF_SELECTED_ELSE_NULL = "copy_if_selected_else_null"
37class CopyObject:
38 """
39 Describes how to copy one model type within a recursively traversed object graph.
41 Terminology:
42 - `copy_relation_to` describes reverse relations that should be traversed
43 recursively to discover child objects that also need copying.
44 - `forward_relation_policy` describes nullable forward relations (foreign keys) on the
45 current model that should be resolved after the copied subset exists.
47 Concrete fields are copied by default. The main exceptions are:
48 - the FK/O2O field that should point at a copied parent discovered during
49 nested traversal
50 - nullable forward relations managed by `forward_relation_policy`, which
51 are initialised as `None` and resolved safely afterwards
52 """
53 def __init__(
54 self,
55 model: type[Model],
56 queryset: Iterable[Model] | None = None,
57 copy_relation_to: dict[str, "CopyObject"] | None = None,
58 forward_relation_policy: dict[str, RemapOrRetainMode] | None = None,
59 ):
60 """
61 Configure one node of the copy graph.
63 `model` is the Django model represented by this node.
64 `queryset` provides the root objects for top-level copy calls.
65 `copy_relation_to` maps reverse relation names to nested `CopyObject`
66 definitions for recursive traversal.
67 `forward_relation_policy` maps nullable forward field names to policies
68 that decide whether a copied row should keep, null, or remap that
69 relation once copied targets are known.
70 """
71 self.model = model
72 self.queryset = queryset
73 self.copy_relation_to = copy_relation_to or {}
74 self.forward_relation_policy = forward_relation_policy or {}
76 def copy(
77 self,
78 model_lookups: ModelLookupDict = None,
79 _bound_sources: list[BoundSource] = None,
80 _pending_updates: list = None,
81 ) -> list[Model]:
82 """
83 Copy the configured root queryset, or a nested set of `BoundSource`s.
85 Nested calls share both `model_lookups` and `pending_updates` so that:
86 - the same original model never gets copied twice
87 - relations like `Port.stream` can be remapped after the copied streams
88 have been created
90 Returns the copied instances represented by this `CopyObject`.
91 """
92 if model_lookups is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 model_lookups = {}
95 pending_updates = _pending_updates
96 created_pending_updates = pending_updates is None
97 if pending_updates is None:
98 pending_updates = []
100 bound_sources = _bound_sources or [BoundSource(instance) for instance in self._get_root_sources()]
101 clones = self._copy_bound_sources(bound_sources, model_lookups, pending_updates)
103 for relation_name, nested_copy_object in self.copy_relation_to.items():
104 child_bound_sources = self._collect_child_sources(bound_sources, relation_name)
105 if child_bound_sources:
106 nested_copy_object.copy(model_lookups, child_bound_sources, pending_updates)
108 if created_pending_updates:
109 self._apply_pending_updates(pending_updates)
110 return clones
112 def _get_root_sources(self) -> list[Model]:
113 """
114 Materialise the root queryset once so recursive copy order is stable.
115 """
116 if self.queryset is None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 raise ValueError(
118 f"CopyObject({self.model.__name__}) requires queryset for root copy() calls."
119 )
120 return list(self.queryset)
122 def _copy_bound_sources(
123 self,
124 bound_sources: list[BoundSource],
125 model_lookups: ModelLookupDict,
126 pending_updates: list,
127 ) -> list[Model]:
128 """
129 Copy each source at most once for this model type.
131 The `ModelLookup` is keyed by the *original* primary key and stores the
132 newly created clone. That gives us the same old->new identity mapping
133 pattern used by `copy_flowsheet`.
134 """
135 current_model_lookup = model_lookups.setdefault(self.model, ModelLookup([]))
136 clones = []
137 for bound_source in bound_sources:
138 source = bound_source.source
139 existing = current_model_lookup.get_model(source.pk)
140 if existing is not None: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 clones.append(existing)
142 continue
144 clone = self._build_clone(bound_source, model_lookups)
145 clone.save()
146 current_model_lookup.model_map[source.pk] = clone
147 self._schedule_forward_relation_updates(
148 source,
149 clone,
150 model_lookups,
151 pending_updates,
152 )
153 clones.append(clone)
154 return clones
156 def _build_clone(self, bound_source: BoundSource, model_lookups: ModelLookupDict) -> Model:
157 """
158 Build a shallow clone of the source model.
160 Most concrete fields are copied directly from the original object. The
161 main exception is the field that points back to the copied parent in a
162 nested copy, e.g. `PropertySet.simulationObject` when copying
163 `SimulationObject.properties`. Nullable forward relations managed by
164 `forward_relation_policy` also skip the original value initially so they can be
165 resolved safely once all copied targets exist.
166 """
167 source = bound_source.source
168 clone = self.model()
170 for field in source._meta.concrete_fields:
171 if field.primary_key:
172 continue
174 if bound_source.parent_field_name == field.name:
175 # Nested copy relations need to point at the copied parent, not
176 # the original parent we traversed from.
177 parent_lookup = model_lookups[type(bound_source.parent_source)]
178 parent_clone = parent_lookup.get_model(bound_source.parent_source.pk)
179 if parent_clone is None: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 raise ValueError(
181 f"Missing copied parent for {self.model.__name__}.{field.name}."
182 )
183 setattr(clone, field.name, parent_clone)
184 continue
186 if field.name in self.forward_relation_policy and field.null:
187 # Defer these relations entirely so subset-copy policies can
188 # decide whether to keep, null, or remap them after the copied
189 # targets exist. This also avoids unique one-to-one insert
190 # failures when the original related row is still in use.
191 setattr(clone, field.attname, None)
192 continue
194 setattr(clone, field.attname, getattr(source, field.attname))
196 return clone
198 def _collect_child_sources(
199 self,
200 bound_sources: list[BoundSource],
201 relation_name: str,
202 ) -> list[BoundSource]:
203 """
204 Expand a nested copy relation into child sources.
206 We only support reverse one-to-one and reverse foreign key relations for
207 now, because those are the relations needed by the current simulation
208 object duplication shape.
209 """
210 field = self.model._meta.get_field(relation_name)
211 children: list[BoundSource] = []
213 if isinstance(field, OneToOneRel):
214 parent_field_name = field.field.name
215 for bound_source in bound_sources:
216 try:
217 child = getattr(bound_source.source, relation_name)
218 except ObjectDoesNotExist:
219 # Reverse one-to-one access raises when the child is absent.
220 continue
221 if child is not None: 221 ↛ 215line 221 didn't jump to line 215 because the condition on line 221 was always true
222 children.append(
223 BoundSource(
224 source=child,
225 parent_source=bound_source.source,
226 parent_field_name=parent_field_name,
227 )
228 )
229 return children
231 if isinstance(field, ManyToOneRel): 231 ↛ 244line 231 didn't jump to line 244 because the condition on line 231 was always true
232 parent_field_name = field.field.name
233 for bound_source in bound_sources:
234 for child in getattr(bound_source.source, relation_name).all():
235 children.append(
236 BoundSource(
237 source=child,
238 parent_source=bound_source.source,
239 parent_field_name=parent_field_name,
240 )
241 )
242 return children
244 raise ValueError(
245 f"copy_relation_to only supports reverse one-to-one or reverse foreign key "
246 f"relations. {self.model.__name__}.{relation_name} is not supported."
247 )
249 def _schedule_forward_relation_updates(
250 self,
251 source: Model,
252 clone: Model,
253 model_lookups: ModelLookupDict,
254 pending_updates: list,
255 ) -> None:
256 """
257 Defer remap-or-retain fields until all nested copies have been created.
259 Example: `Port.stream` should point at the copied stream when that
260 stream was part of the selected subset, otherwise it should stay
261 connected to the original stream.
262 """
263 if not self.forward_relation_policy:
264 return
266 def update_relations() -> None:
267 fields_to_update = []
268 for field_name, mode in self.forward_relation_policy.items():
269 if mode not in [ 269 ↛ 273line 269 didn't jump to line 273 because the condition on line 269 was never true
270 RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP,
271 RemapOrRetainMode.COPY_IF_SELECTED_ELSE_NULL,
272 ]:
273 raise ValueError(
274 f"Unsupported forward relation policy '{mode}' for "
275 f"{self.model.__name__}.{field_name}."
276 )
278 related_pk = getattr(source, f"{field_name}_id")
279 if related_pk is None:
280 continue
282 field = source._meta.get_field(field_name)
283 related_lookup = model_lookups.get(field.related_model)
284 if related_lookup is None:
285 # Nothing of that related model type was copied.
286 if mode == RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP: 286 ↛ 289line 286 didn't jump to line 289 because the condition on line 286 was always true
287 setattr(clone, field.name, getattr(source, field.name))
288 fields_to_update.append(field.name)
289 continue
290 related_copy = related_lookup.get_model(related_pk)
291 if related_copy is None:
292 # The related object was not selected.
293 if mode == RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP:
294 setattr(clone, field.name, getattr(source, field.name))
295 fields_to_update.append(field.name)
296 continue
298 setattr(clone, field.name, related_copy)
299 fields_to_update.append(field.name)
301 if fields_to_update:
302 clone.save(update_fields=fields_to_update)
304 pending_updates.append(update_relations)
306 def _apply_pending_updates(self, pending_updates: list) -> None:
307 """
308 Run the deferred forward relation (foreign key) updates once the full nested object
309 graph exists.
310 """
311 for update_relations in pending_updates:
312 update_relations()