Coverage for backend/django/core/auxiliary/methods/copy_object/copy_object.py: 92%

125 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from enum import Enum 

2from typing import Iterable 

3 

4from django.core.exceptions import ObjectDoesNotExist 

5from django.db.models import Model 

6from django.db.models.fields.reverse_related import ManyToOneRel, OneToOneRel 

7 

8from ..copy_flowsheet.copy_caching import ModelLookup, ModelLookupDict 

9 

10 

11class BoundSource: 

12 """ 

13 A source model plus the relationship that led us to it. 

14 

15 "Bound" here means the source is not just an arbitrary model instance: 

16 it is tied to a copied parent via `parent_field_name`, so when we clone 

17 the source we know which forward field should point at the copied parent 

18 rather than the original parent. 

19 """ 

20 def __init__(self, source: Model, parent_source: Model = None, parent_field_name: str = None): 

21 self.source = source 

22 self.parent_source = parent_source 

23 self.parent_field_name = parent_field_name 

24 

25 

26class RemapOrRetainMode(Enum): 

27 """ 

28 Policies for forward relations that are resolved after the copied subset exists. 

29 

30 These modes are used by `forward_relation_policy`. 

31 """ 

32 

33 COPY_IF_SELECTED_ELSE_KEEP = "copy_if_selected_else_keep" 

34 COPY_IF_SELECTED_ELSE_NULL = "copy_if_selected_else_null" 

35 

36 

37class CopyObject: 

38 """ 

39 Describes how to copy one model type within a recursively traversed object graph. 

40 

41 Terminology: 

42 - `copy_relation_to` describes reverse relations that should be traversed 

43 recursively to discover child objects that also need copying. 

44 - `forward_relation_policy` describes nullable forward relations (foreign keys) on the 

45 current model that should be resolved after the copied subset exists. 

46 

47 Concrete fields are copied by default. The main exceptions are: 

48 - the FK/O2O field that should point at a copied parent discovered during 

49 nested traversal 

50 - nullable forward relations managed by `forward_relation_policy`, which 

51 are initialised as `None` and resolved safely afterwards 

52 """ 

53 def __init__( 

54 self, 

55 model: type[Model], 

56 queryset: Iterable[Model] | None = None, 

57 copy_relation_to: dict[str, "CopyObject"] | None = None, 

58 forward_relation_policy: dict[str, RemapOrRetainMode] | None = None, 

59 ): 

60 """ 

61 Configure one node of the copy graph. 

62 

63 `model` is the Django model represented by this node. 

64 `queryset` provides the root objects for top-level copy calls. 

65 `copy_relation_to` maps reverse relation names to nested `CopyObject` 

66 definitions for recursive traversal. 

67 `forward_relation_policy` maps nullable forward field names to policies 

68 that decide whether a copied row should keep, null, or remap that 

69 relation once copied targets are known. 

70 """ 

71 self.model = model 

72 self.queryset = queryset 

73 self.copy_relation_to = copy_relation_to or {} 

74 self.forward_relation_policy = forward_relation_policy or {} 

75 

76 def copy( 

77 self, 

78 model_lookups: ModelLookupDict = None, 

79 _bound_sources: list[BoundSource] = None, 

80 _pending_updates: list = None, 

81 ) -> list[Model]: 

82 """ 

83 Copy the configured root queryset, or a nested set of `BoundSource`s. 

84 

85 Nested calls share both `model_lookups` and `pending_updates` so that: 

86 - the same original model never gets copied twice 

87 - relations like `Port.stream` can be remapped after the copied streams 

88 have been created 

89 

90 Returns the copied instances represented by this `CopyObject`. 

91 """ 

92 if model_lookups is None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 model_lookups = {} 

94 

95 pending_updates = _pending_updates 

96 created_pending_updates = pending_updates is None 

97 if pending_updates is None: 

98 pending_updates = [] 

99 

100 bound_sources = _bound_sources or [BoundSource(instance) for instance in self._get_root_sources()] 

101 clones = self._copy_bound_sources(bound_sources, model_lookups, pending_updates) 

102 

103 for relation_name, nested_copy_object in self.copy_relation_to.items(): 

104 child_bound_sources = self._collect_child_sources(bound_sources, relation_name) 

105 if child_bound_sources: 

106 nested_copy_object.copy(model_lookups, child_bound_sources, pending_updates) 

107 

108 if created_pending_updates: 

109 self._apply_pending_updates(pending_updates) 

110 return clones 

111 

112 def _get_root_sources(self) -> list[Model]: 

113 """ 

114 Materialise the root queryset once so recursive copy order is stable. 

115 """ 

116 if self.queryset is None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 raise ValueError( 

118 f"CopyObject({self.model.__name__}) requires queryset for root copy() calls." 

119 ) 

120 return list(self.queryset) 

121 

122 def _copy_bound_sources( 

123 self, 

124 bound_sources: list[BoundSource], 

125 model_lookups: ModelLookupDict, 

126 pending_updates: list, 

127 ) -> list[Model]: 

128 """ 

129 Copy each source at most once for this model type. 

130 

131 The `ModelLookup` is keyed by the *original* primary key and stores the 

132 newly created clone. That gives us the same old->new identity mapping 

133 pattern used by `copy_flowsheet`. 

134 """ 

135 current_model_lookup = model_lookups.setdefault(self.model, ModelLookup([])) 

136 clones = [] 

137 for bound_source in bound_sources: 

138 source = bound_source.source 

139 existing = current_model_lookup.get_model(source.pk) 

140 if existing is not None: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 clones.append(existing) 

142 continue 

143 

144 clone = self._build_clone(bound_source, model_lookups) 

145 clone.save() 

146 current_model_lookup.model_map[source.pk] = clone 

147 self._schedule_forward_relation_updates( 

148 source, 

149 clone, 

150 model_lookups, 

151 pending_updates, 

152 ) 

153 clones.append(clone) 

154 return clones 

155 

156 def _build_clone(self, bound_source: BoundSource, model_lookups: ModelLookupDict) -> Model: 

157 """ 

158 Build a shallow clone of the source model. 

159 

160 Most concrete fields are copied directly from the original object. The 

161 main exception is the field that points back to the copied parent in a 

162 nested copy, e.g. `PropertySet.simulationObject` when copying 

163 `SimulationObject.properties`. Nullable forward relations managed by 

164 `forward_relation_policy` also skip the original value initially so they can be 

165 resolved safely once all copied targets exist. 

166 """ 

167 source = bound_source.source 

168 clone = self.model() 

169 

170 for field in source._meta.concrete_fields: 

171 if field.primary_key: 

172 continue 

173 

174 if bound_source.parent_field_name == field.name: 

175 # Nested copy relations need to point at the copied parent, not 

176 # the original parent we traversed from. 

177 parent_lookup = model_lookups[type(bound_source.parent_source)] 

178 parent_clone = parent_lookup.get_model(bound_source.parent_source.pk) 

179 if parent_clone is None: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 raise ValueError( 

181 f"Missing copied parent for {self.model.__name__}.{field.name}." 

182 ) 

183 setattr(clone, field.name, parent_clone) 

184 continue 

185 

186 if field.name in self.forward_relation_policy and field.null: 

187 # Defer these relations entirely so subset-copy policies can 

188 # decide whether to keep, null, or remap them after the copied 

189 # targets exist. This also avoids unique one-to-one insert 

190 # failures when the original related row is still in use. 

191 setattr(clone, field.attname, None) 

192 continue 

193 

194 setattr(clone, field.attname, getattr(source, field.attname)) 

195 

196 return clone 

197 

198 def _collect_child_sources( 

199 self, 

200 bound_sources: list[BoundSource], 

201 relation_name: str, 

202 ) -> list[BoundSource]: 

203 """ 

204 Expand a nested copy relation into child sources. 

205 

206 We only support reverse one-to-one and reverse foreign key relations for 

207 now, because those are the relations needed by the current simulation 

208 object duplication shape. 

209 """ 

210 field = self.model._meta.get_field(relation_name) 

211 children: list[BoundSource] = [] 

212 

213 if isinstance(field, OneToOneRel): 

214 parent_field_name = field.field.name 

215 for bound_source in bound_sources: 

216 try: 

217 child = getattr(bound_source.source, relation_name) 

218 except ObjectDoesNotExist: 

219 # Reverse one-to-one access raises when the child is absent. 

220 continue 

221 if child is not None: 221 ↛ 215line 221 didn't jump to line 215 because the condition on line 221 was always true

222 children.append( 

223 BoundSource( 

224 source=child, 

225 parent_source=bound_source.source, 

226 parent_field_name=parent_field_name, 

227 ) 

228 ) 

229 return children 

230 

231 if isinstance(field, ManyToOneRel): 231 ↛ 244line 231 didn't jump to line 244 because the condition on line 231 was always true

232 parent_field_name = field.field.name 

233 for bound_source in bound_sources: 

234 for child in getattr(bound_source.source, relation_name).all(): 

235 children.append( 

236 BoundSource( 

237 source=child, 

238 parent_source=bound_source.source, 

239 parent_field_name=parent_field_name, 

240 ) 

241 ) 

242 return children 

243 

244 raise ValueError( 

245 f"copy_relation_to only supports reverse one-to-one or reverse foreign key " 

246 f"relations. {self.model.__name__}.{relation_name} is not supported." 

247 ) 

248 

249 def _schedule_forward_relation_updates( 

250 self, 

251 source: Model, 

252 clone: Model, 

253 model_lookups: ModelLookupDict, 

254 pending_updates: list, 

255 ) -> None: 

256 """ 

257 Defer remap-or-retain fields until all nested copies have been created. 

258 

259 Example: `Port.stream` should point at the copied stream when that 

260 stream was part of the selected subset, otherwise it should stay 

261 connected to the original stream. 

262 """ 

263 if not self.forward_relation_policy: 

264 return 

265 

266 def update_relations() -> None: 

267 fields_to_update = [] 

268 for field_name, mode in self.forward_relation_policy.items(): 

269 if mode not in [ 269 ↛ 273line 269 didn't jump to line 273 because the condition on line 269 was never true

270 RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP, 

271 RemapOrRetainMode.COPY_IF_SELECTED_ELSE_NULL, 

272 ]: 

273 raise ValueError( 

274 f"Unsupported forward relation policy '{mode}' for " 

275 f"{self.model.__name__}.{field_name}." 

276 ) 

277 

278 related_pk = getattr(source, f"{field_name}_id") 

279 if related_pk is None: 

280 continue 

281 

282 field = source._meta.get_field(field_name) 

283 related_lookup = model_lookups.get(field.related_model) 

284 if related_lookup is None: 

285 # Nothing of that related model type was copied. 

286 if mode == RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP: 286 ↛ 289line 286 didn't jump to line 289 because the condition on line 286 was always true

287 setattr(clone, field.name, getattr(source, field.name)) 

288 fields_to_update.append(field.name) 

289 continue 

290 related_copy = related_lookup.get_model(related_pk) 

291 if related_copy is None: 

292 # The related object was not selected. 

293 if mode == RemapOrRetainMode.COPY_IF_SELECTED_ELSE_KEEP: 

294 setattr(clone, field.name, getattr(source, field.name)) 

295 fields_to_update.append(field.name) 

296 continue 

297 

298 setattr(clone, field.name, related_copy) 

299 fields_to_update.append(field.name) 

300 

301 if fields_to_update: 

302 clone.save(update_fields=fields_to_update) 

303 

304 pending_updates.append(update_relations) 

305 

306 def _apply_pending_updates(self, pending_updates: list) -> None: 

307 """ 

308 Run the deferred forward relation (foreign key) updates once the full nested object 

309 graph exists. 

310 """ 

311 for update_relations in pending_updates: 

312 update_relations()