1. from math import ceil
    
  2. 
    
  3. from django.db import connection, models
    
  4. from django.db.models import ProtectedError, RestrictedError
    
  5. from django.db.models.deletion import Collector
    
  6. from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE
    
  7. from django.test import TestCase, skipIfDBFeature, skipUnlessDBFeature
    
  8. 
    
  9. from .models import (
    
  10.     B1,
    
  11.     B2,
    
  12.     B3,
    
  13.     MR,
    
  14.     A,
    
  15.     Avatar,
    
  16.     B,
    
  17.     Base,
    
  18.     Child,
    
  19.     DeleteBottom,
    
  20.     DeleteTop,
    
  21.     GenericB1,
    
  22.     GenericB2,
    
  23.     GenericDeleteBottom,
    
  24.     HiddenUser,
    
  25.     HiddenUserProfile,
    
  26.     M,
    
  27.     M2MFrom,
    
  28.     M2MTo,
    
  29.     MRNull,
    
  30.     Origin,
    
  31.     P,
    
  32.     Parent,
    
  33.     R,
    
  34.     RChild,
    
  35.     RChildChild,
    
  36.     Referrer,
    
  37.     S,
    
  38.     T,
    
  39.     User,
    
  40.     create_a,
    
  41.     get_default_r,
    
  42. )
    
  43. 
    
  44. 
    
  45. class OnDeleteTests(TestCase):
    
  46.     def setUp(self):
    
  47.         self.DEFAULT = get_default_r()
    
  48. 
    
  49.     def test_auto(self):
    
  50.         a = create_a("auto")
    
  51.         a.auto.delete()
    
  52.         self.assertFalse(A.objects.filter(name="auto").exists())
    
  53. 
    
  54.     def test_non_callable(self):
    
  55.         msg = "on_delete must be callable."
    
  56.         with self.assertRaisesMessage(TypeError, msg):
    
  57.             models.ForeignKey("self", on_delete=None)
    
  58.         with self.assertRaisesMessage(TypeError, msg):
    
  59.             models.OneToOneField("self", on_delete=None)
    
  60. 
    
  61.     def test_auto_nullable(self):
    
  62.         a = create_a("auto_nullable")
    
  63.         a.auto_nullable.delete()
    
  64.         self.assertFalse(A.objects.filter(name="auto_nullable").exists())
    
  65. 
    
  66.     def test_setvalue(self):
    
  67.         a = create_a("setvalue")
    
  68.         a.setvalue.delete()
    
  69.         a = A.objects.get(pk=a.pk)
    
  70.         self.assertEqual(self.DEFAULT, a.setvalue.pk)
    
  71. 
    
  72.     def test_setnull(self):
    
  73.         a = create_a("setnull")
    
  74.         a.setnull.delete()
    
  75.         a = A.objects.get(pk=a.pk)
    
  76.         self.assertIsNone(a.setnull)
    
  77. 
    
  78.     def test_setdefault(self):
    
  79.         a = create_a("setdefault")
    
  80.         a.setdefault.delete()
    
  81.         a = A.objects.get(pk=a.pk)
    
  82.         self.assertEqual(self.DEFAULT, a.setdefault.pk)
    
  83. 
    
  84.     def test_setdefault_none(self):
    
  85.         a = create_a("setdefault_none")
    
  86.         a.setdefault_none.delete()
    
  87.         a = A.objects.get(pk=a.pk)
    
  88.         self.assertIsNone(a.setdefault_none)
    
  89. 
    
  90.     def test_cascade(self):
    
  91.         a = create_a("cascade")
    
  92.         a.cascade.delete()
    
  93.         self.assertFalse(A.objects.filter(name="cascade").exists())
    
  94. 
    
  95.     def test_cascade_nullable(self):
    
  96.         a = create_a("cascade_nullable")
    
  97.         a.cascade_nullable.delete()
    
  98.         self.assertFalse(A.objects.filter(name="cascade_nullable").exists())
    
  99. 
    
  100.     def test_protect(self):
    
  101.         a = create_a("protect")
    
  102.         msg = (
    
  103.             "Cannot delete some instances of model 'R' because they are "
    
  104.             "referenced through protected foreign keys: 'A.protect'."
    
  105.         )
    
  106.         with self.assertRaisesMessage(ProtectedError, msg) as cm:
    
  107.             a.protect.delete()
    
  108.         self.assertEqual(cm.exception.protected_objects, {a})
    
  109. 
    
  110.     def test_protect_multiple(self):
    
  111.         a = create_a("protect")
    
  112.         b = B.objects.create(protect=a.protect)
    
  113.         msg = (
    
  114.             "Cannot delete some instances of model 'R' because they are "
    
  115.             "referenced through protected foreign keys: 'A.protect', "
    
  116.             "'B.protect'."
    
  117.         )
    
  118.         with self.assertRaisesMessage(ProtectedError, msg) as cm:
    
  119.             a.protect.delete()
    
  120.         self.assertEqual(cm.exception.protected_objects, {a, b})
    
  121. 
    
  122.     def test_protect_path(self):
    
  123.         a = create_a("protect")
    
  124.         a.protect.p = P.objects.create()
    
  125.         a.protect.save()
    
  126.         msg = (
    
  127.             "Cannot delete some instances of model 'P' because they are "
    
  128.             "referenced through protected foreign keys: 'R.p'."
    
  129.         )
    
  130.         with self.assertRaisesMessage(ProtectedError, msg) as cm:
    
  131.             a.protect.p.delete()
    
  132.         self.assertEqual(cm.exception.protected_objects, {a})
    
  133. 
    
  134.     def test_do_nothing(self):
    
  135.         # Testing DO_NOTHING is a bit harder: It would raise IntegrityError for
    
  136.         # a normal model, so we connect to pre_delete and set the fk to a known
    
  137.         # value.
    
  138.         replacement_r = R.objects.create()
    
  139. 
    
  140.         def check_do_nothing(sender, **kwargs):
    
  141.             obj = kwargs["instance"]
    
  142.             obj.donothing_set.update(donothing=replacement_r)
    
  143. 
    
  144.         models.signals.pre_delete.connect(check_do_nothing)
    
  145.         a = create_a("do_nothing")
    
  146.         a.donothing.delete()
    
  147.         a = A.objects.get(pk=a.pk)
    
  148.         self.assertEqual(replacement_r, a.donothing)
    
  149.         models.signals.pre_delete.disconnect(check_do_nothing)
    
  150. 
    
  151.     def test_do_nothing_qscount(self):
    
  152.         """
    
  153.         A models.DO_NOTHING relation doesn't trigger a query.
    
  154.         """
    
  155.         b = Base.objects.create()
    
  156.         with self.assertNumQueries(1):
    
  157.             # RelToBase should not be queried.
    
  158.             b.delete()
    
  159.         self.assertEqual(Base.objects.count(), 0)
    
  160. 
    
  161.     def test_inheritance_cascade_up(self):
    
  162.         child = RChild.objects.create()
    
  163.         child.delete()
    
  164.         self.assertFalse(R.objects.filter(pk=child.pk).exists())
    
  165. 
    
  166.     def test_inheritance_cascade_down(self):
    
  167.         child = RChild.objects.create()
    
  168.         parent = child.r_ptr
    
  169.         parent.delete()
    
  170.         self.assertFalse(RChild.objects.filter(pk=child.pk).exists())
    
  171. 
    
  172.     def test_cascade_from_child(self):
    
  173.         a = create_a("child")
    
  174.         a.child.delete()
    
  175.         self.assertFalse(A.objects.filter(name="child").exists())
    
  176.         self.assertFalse(R.objects.filter(pk=a.child_id).exists())
    
  177. 
    
  178.     def test_cascade_from_parent(self):
    
  179.         a = create_a("child")
    
  180.         R.objects.get(pk=a.child_id).delete()
    
  181.         self.assertFalse(A.objects.filter(name="child").exists())
    
  182.         self.assertFalse(RChild.objects.filter(pk=a.child_id).exists())
    
  183. 
    
  184.     def test_setnull_from_child(self):
    
  185.         a = create_a("child_setnull")
    
  186.         a.child_setnull.delete()
    
  187.         self.assertFalse(R.objects.filter(pk=a.child_setnull_id).exists())
    
  188. 
    
  189.         a = A.objects.get(pk=a.pk)
    
  190.         self.assertIsNone(a.child_setnull)
    
  191. 
    
  192.     def test_setnull_from_parent(self):
    
  193.         a = create_a("child_setnull")
    
  194.         R.objects.get(pk=a.child_setnull_id).delete()
    
  195.         self.assertFalse(RChild.objects.filter(pk=a.child_setnull_id).exists())
    
  196. 
    
  197.         a = A.objects.get(pk=a.pk)
    
  198.         self.assertIsNone(a.child_setnull)
    
  199. 
    
  200.     def test_o2o_setnull(self):
    
  201.         a = create_a("o2o_setnull")
    
  202.         a.o2o_setnull.delete()
    
  203.         a = A.objects.get(pk=a.pk)
    
  204.         self.assertIsNone(a.o2o_setnull)
    
  205. 
    
  206.     def test_restrict(self):
    
  207.         a = create_a("restrict")
    
  208.         msg = (
    
  209.             "Cannot delete some instances of model 'R' because they are "
    
  210.             "referenced through restricted foreign keys: 'A.restrict'."
    
  211.         )
    
  212.         with self.assertRaisesMessage(RestrictedError, msg) as cm:
    
  213.             a.restrict.delete()
    
  214.         self.assertEqual(cm.exception.restricted_objects, {a})
    
  215. 
    
  216.     def test_restrict_multiple(self):
    
  217.         a = create_a("restrict")
    
  218.         b3 = B3.objects.create(restrict=a.restrict)
    
  219.         msg = (
    
  220.             "Cannot delete some instances of model 'R' because they are "
    
  221.             "referenced through restricted foreign keys: 'A.restrict', "
    
  222.             "'B3.restrict'."
    
  223.         )
    
  224.         with self.assertRaisesMessage(RestrictedError, msg) as cm:
    
  225.             a.restrict.delete()
    
  226.         self.assertEqual(cm.exception.restricted_objects, {a, b3})
    
  227. 
    
  228.     def test_restrict_path_cascade_indirect(self):
    
  229.         a = create_a("restrict")
    
  230.         a.restrict.p = P.objects.create()
    
  231.         a.restrict.save()
    
  232.         msg = (
    
  233.             "Cannot delete some instances of model 'P' because they are "
    
  234.             "referenced through restricted foreign keys: 'A.restrict'."
    
  235.         )
    
  236.         with self.assertRaisesMessage(RestrictedError, msg) as cm:
    
  237.             a.restrict.p.delete()
    
  238.         self.assertEqual(cm.exception.restricted_objects, {a})
    
  239.         # Object referenced also with CASCADE relationship can be deleted.
    
  240.         a.cascade.p = a.restrict.p
    
  241.         a.cascade.save()
    
  242.         a.restrict.p.delete()
    
  243.         self.assertFalse(A.objects.filter(name="restrict").exists())
    
  244.         self.assertFalse(R.objects.filter(pk=a.restrict_id).exists())
    
  245. 
    
  246.     def test_restrict_path_cascade_direct(self):
    
  247.         a = create_a("restrict")
    
  248.         a.restrict.p = P.objects.create()
    
  249.         a.restrict.save()
    
  250.         a.cascade_p = a.restrict.p
    
  251.         a.save()
    
  252.         a.restrict.p.delete()
    
  253.         self.assertFalse(A.objects.filter(name="restrict").exists())
    
  254.         self.assertFalse(R.objects.filter(pk=a.restrict_id).exists())
    
  255. 
    
  256.     def test_restrict_path_cascade_indirect_diamond(self):
    
  257.         delete_top = DeleteTop.objects.create()
    
  258.         b1 = B1.objects.create(delete_top=delete_top)
    
  259.         b2 = B2.objects.create(delete_top=delete_top)
    
  260.         delete_bottom = DeleteBottom.objects.create(b1=b1, b2=b2)
    
  261.         msg = (
    
  262.             "Cannot delete some instances of model 'B1' because they are "
    
  263.             "referenced through restricted foreign keys: 'DeleteBottom.b1'."
    
  264.         )
    
  265.         with self.assertRaisesMessage(RestrictedError, msg) as cm:
    
  266.             b1.delete()
    
  267.         self.assertEqual(cm.exception.restricted_objects, {delete_bottom})
    
  268.         self.assertTrue(DeleteTop.objects.exists())
    
  269.         self.assertTrue(B1.objects.exists())
    
  270.         self.assertTrue(B2.objects.exists())
    
  271.         self.assertTrue(DeleteBottom.objects.exists())
    
  272.         # Object referenced also with CASCADE relationship can be deleted.
    
  273.         delete_top.delete()
    
  274.         self.assertFalse(DeleteTop.objects.exists())
    
  275.         self.assertFalse(B1.objects.exists())
    
  276.         self.assertFalse(B2.objects.exists())
    
  277.         self.assertFalse(DeleteBottom.objects.exists())
    
  278. 
    
  279.     def test_restrict_gfk_no_fast_delete(self):
    
  280.         delete_top = DeleteTop.objects.create()
    
  281.         generic_b1 = GenericB1.objects.create(generic_delete_top=delete_top)
    
  282.         generic_b2 = GenericB2.objects.create(generic_delete_top=delete_top)
    
  283.         generic_delete_bottom = GenericDeleteBottom.objects.create(
    
  284.             generic_b1=generic_b1,
    
  285.             generic_b2=generic_b2,
    
  286.         )
    
  287.         msg = (
    
  288.             "Cannot delete some instances of model 'GenericB1' because they "
    
  289.             "are referenced through restricted foreign keys: "
    
  290.             "'GenericDeleteBottom.generic_b1'."
    
  291.         )
    
  292.         with self.assertRaisesMessage(RestrictedError, msg) as cm:
    
  293.             generic_b1.delete()
    
  294.         self.assertEqual(cm.exception.restricted_objects, {generic_delete_bottom})
    
  295.         self.assertTrue(DeleteTop.objects.exists())
    
  296.         self.assertTrue(GenericB1.objects.exists())
    
  297.         self.assertTrue(GenericB2.objects.exists())
    
  298.         self.assertTrue(GenericDeleteBottom.objects.exists())
    
  299.         # Object referenced also with CASCADE relationship can be deleted.
    
  300.         delete_top.delete()
    
  301.         self.assertFalse(DeleteTop.objects.exists())
    
  302.         self.assertFalse(GenericB1.objects.exists())
    
  303.         self.assertFalse(GenericB2.objects.exists())
    
  304.         self.assertFalse(GenericDeleteBottom.objects.exists())
    
  305. 
    
  306. 
    
  307. class DeletionTests(TestCase):
    
  308.     def test_sliced_queryset(self):
    
  309.         msg = "Cannot use 'limit' or 'offset' with delete()."
    
  310.         with self.assertRaisesMessage(TypeError, msg):
    
  311.             M.objects.all()[0:5].delete()
    
  312. 
    
  313.     def test_pk_none(self):
    
  314.         m = M()
    
  315.         msg = "M object can't be deleted because its id attribute is set to None."
    
  316.         with self.assertRaisesMessage(ValueError, msg):
    
  317.             m.delete()
    
  318. 
    
  319.     def test_m2m(self):
    
  320.         m = M.objects.create()
    
  321.         r = R.objects.create()
    
  322.         MR.objects.create(m=m, r=r)
    
  323.         r.delete()
    
  324.         self.assertFalse(MR.objects.exists())
    
  325. 
    
  326.         r = R.objects.create()
    
  327.         MR.objects.create(m=m, r=r)
    
  328.         m.delete()
    
  329.         self.assertFalse(MR.objects.exists())
    
  330. 
    
  331.         m = M.objects.create()
    
  332.         r = R.objects.create()
    
  333.         m.m2m.add(r)
    
  334.         r.delete()
    
  335.         through = M._meta.get_field("m2m").remote_field.through
    
  336.         self.assertFalse(through.objects.exists())
    
  337. 
    
  338.         r = R.objects.create()
    
  339.         m.m2m.add(r)
    
  340.         m.delete()
    
  341.         self.assertFalse(through.objects.exists())
    
  342. 
    
  343.         m = M.objects.create()
    
  344.         r = R.objects.create()
    
  345.         MRNull.objects.create(m=m, r=r)
    
  346.         r.delete()
    
  347.         self.assertFalse(not MRNull.objects.exists())
    
  348.         self.assertFalse(m.m2m_through_null.exists())
    
  349. 
    
  350.     def test_bulk(self):
    
  351.         s = S.objects.create(r=R.objects.create())
    
  352.         for i in range(2 * GET_ITERATOR_CHUNK_SIZE):
    
  353.             T.objects.create(s=s)
    
  354.         #   1 (select related `T` instances)
    
  355.         # + 1 (select related `U` instances)
    
  356.         # + 2 (delete `T` instances in batches)
    
  357.         # + 1 (delete `s`)
    
  358.         self.assertNumQueries(5, s.delete)
    
  359.         self.assertFalse(S.objects.exists())
    
  360. 
    
  361.     def test_instance_update(self):
    
  362.         deleted = []
    
  363.         related_setnull_sets = []
    
  364. 
    
  365.         def pre_delete(sender, **kwargs):
    
  366.             obj = kwargs["instance"]
    
  367.             deleted.append(obj)
    
  368.             if isinstance(obj, R):
    
  369.                 related_setnull_sets.append([a.pk for a in obj.setnull_set.all()])
    
  370. 
    
  371.         models.signals.pre_delete.connect(pre_delete)
    
  372.         a = create_a("update_setnull")
    
  373.         a.setnull.delete()
    
  374. 
    
  375.         a = create_a("update_cascade")
    
  376.         a.cascade.delete()
    
  377. 
    
  378.         for obj in deleted:
    
  379.             self.assertIsNone(obj.pk)
    
  380. 
    
  381.         for pk_list in related_setnull_sets:
    
  382.             for a in A.objects.filter(id__in=pk_list):
    
  383.                 self.assertIsNone(a.setnull)
    
  384. 
    
  385.         models.signals.pre_delete.disconnect(pre_delete)
    
  386. 
    
  387.     def test_deletion_order(self):
    
  388.         pre_delete_order = []
    
  389.         post_delete_order = []
    
  390. 
    
  391.         def log_post_delete(sender, **kwargs):
    
  392.             pre_delete_order.append((sender, kwargs["instance"].pk))
    
  393. 
    
  394.         def log_pre_delete(sender, **kwargs):
    
  395.             post_delete_order.append((sender, kwargs["instance"].pk))
    
  396. 
    
  397.         models.signals.post_delete.connect(log_post_delete)
    
  398.         models.signals.pre_delete.connect(log_pre_delete)
    
  399. 
    
  400.         r = R.objects.create(pk=1)
    
  401.         s1 = S.objects.create(pk=1, r=r)
    
  402.         s2 = S.objects.create(pk=2, r=r)
    
  403.         T.objects.create(pk=1, s=s1)
    
  404.         T.objects.create(pk=2, s=s2)
    
  405.         RChild.objects.create(r_ptr=r)
    
  406.         r.delete()
    
  407.         self.assertEqual(
    
  408.             pre_delete_order, [(T, 2), (T, 1), (RChild, 1), (S, 2), (S, 1), (R, 1)]
    
  409.         )
    
  410.         self.assertEqual(
    
  411.             post_delete_order, [(T, 1), (T, 2), (RChild, 1), (S, 1), (S, 2), (R, 1)]
    
  412.         )
    
  413. 
    
  414.         models.signals.post_delete.disconnect(log_post_delete)
    
  415.         models.signals.pre_delete.disconnect(log_pre_delete)
    
  416. 
    
  417.     def test_relational_post_delete_signals_happen_before_parent_object(self):
    
  418.         deletions = []
    
  419. 
    
  420.         def log_post_delete(instance, **kwargs):
    
  421.             self.assertTrue(R.objects.filter(pk=instance.r_id))
    
  422.             self.assertIs(type(instance), S)
    
  423.             deletions.append(instance.id)
    
  424. 
    
  425.         r = R.objects.create(pk=1)
    
  426.         S.objects.create(pk=1, r=r)
    
  427. 
    
  428.         models.signals.post_delete.connect(log_post_delete, sender=S)
    
  429. 
    
  430.         try:
    
  431.             r.delete()
    
  432.         finally:
    
  433.             models.signals.post_delete.disconnect(log_post_delete)
    
  434. 
    
  435.         self.assertEqual(len(deletions), 1)
    
  436.         self.assertEqual(deletions[0], 1)
    
  437. 
    
  438.     @skipUnlessDBFeature("can_defer_constraint_checks")
    
  439.     def test_can_defer_constraint_checks(self):
    
  440.         u = User.objects.create(avatar=Avatar.objects.create())
    
  441.         a = Avatar.objects.get(pk=u.avatar_id)
    
  442.         # 1 query to find the users for the avatar.
    
  443.         # 1 query to delete the user
    
  444.         # 1 query to delete the avatar
    
  445.         # The important thing is that when we can defer constraint checks there
    
  446.         # is no need to do an UPDATE on User.avatar to null it out.
    
  447. 
    
  448.         # Attach a signal to make sure we will not do fast_deletes.
    
  449.         calls = []
    
  450. 
    
  451.         def noop(*args, **kwargs):
    
  452.             calls.append("")
    
  453. 
    
  454.         models.signals.post_delete.connect(noop, sender=User)
    
  455. 
    
  456.         self.assertNumQueries(3, a.delete)
    
  457.         self.assertFalse(User.objects.exists())
    
  458.         self.assertFalse(Avatar.objects.exists())
    
  459.         self.assertEqual(len(calls), 1)
    
  460.         models.signals.post_delete.disconnect(noop, sender=User)
    
  461. 
    
  462.     @skipIfDBFeature("can_defer_constraint_checks")
    
  463.     def test_cannot_defer_constraint_checks(self):
    
  464.         u = User.objects.create(avatar=Avatar.objects.create())
    
  465.         # Attach a signal to make sure we will not do fast_deletes.
    
  466.         calls = []
    
  467. 
    
  468.         def noop(*args, **kwargs):
    
  469.             calls.append("")
    
  470. 
    
  471.         models.signals.post_delete.connect(noop, sender=User)
    
  472. 
    
  473.         a = Avatar.objects.get(pk=u.avatar_id)
    
  474.         # The below doesn't make sense... Why do we need to null out
    
  475.         # user.avatar if we are going to delete the user immediately after it,
    
  476.         # and there are no more cascades.
    
  477.         # 1 query to find the users for the avatar.
    
  478.         # 1 query to delete the user
    
  479.         # 1 query to null out user.avatar, because we can't defer the constraint
    
  480.         # 1 query to delete the avatar
    
  481.         self.assertNumQueries(4, a.delete)
    
  482.         self.assertFalse(User.objects.exists())
    
  483.         self.assertFalse(Avatar.objects.exists())
    
  484.         self.assertEqual(len(calls), 1)
    
  485.         models.signals.post_delete.disconnect(noop, sender=User)
    
  486. 
    
  487.     def test_hidden_related(self):
    
  488.         r = R.objects.create()
    
  489.         h = HiddenUser.objects.create(r=r)
    
  490.         HiddenUserProfile.objects.create(user=h)
    
  491. 
    
  492.         r.delete()
    
  493.         self.assertEqual(HiddenUserProfile.objects.count(), 0)
    
  494. 
    
  495.     def test_large_delete(self):
    
  496.         TEST_SIZE = 2000
    
  497.         objs = [Avatar() for i in range(0, TEST_SIZE)]
    
  498.         Avatar.objects.bulk_create(objs)
    
  499.         # Calculate the number of queries needed.
    
  500.         batch_size = connection.ops.bulk_batch_size(["pk"], objs)
    
  501.         # The related fetches are done in batches.
    
  502.         batches = ceil(len(objs) / batch_size)
    
  503.         # One query for Avatar.objects.all() and then one related fast delete for
    
  504.         # each batch.
    
  505.         fetches_to_mem = 1 + batches
    
  506.         # The Avatar objects are going to be deleted in batches of
    
  507.         # GET_ITERATOR_CHUNK_SIZE.
    
  508.         queries = fetches_to_mem + TEST_SIZE // GET_ITERATOR_CHUNK_SIZE
    
  509.         self.assertNumQueries(queries, Avatar.objects.all().delete)
    
  510.         self.assertFalse(Avatar.objects.exists())
    
  511. 
    
  512.     def test_large_delete_related(self):
    
  513.         TEST_SIZE = 2000
    
  514.         s = S.objects.create(r=R.objects.create())
    
  515.         for i in range(TEST_SIZE):
    
  516.             T.objects.create(s=s)
    
  517. 
    
  518.         batch_size = max(connection.ops.bulk_batch_size(["pk"], range(TEST_SIZE)), 1)
    
  519. 
    
  520.         # TEST_SIZE / batch_size (select related `T` instances)
    
  521.         # + 1 (select related `U` instances)
    
  522.         # + TEST_SIZE / GET_ITERATOR_CHUNK_SIZE (delete `T` instances in batches)
    
  523.         # + 1 (delete `s`)
    
  524.         expected_num_queries = ceil(TEST_SIZE / batch_size)
    
  525.         expected_num_queries += ceil(TEST_SIZE / GET_ITERATOR_CHUNK_SIZE) + 2
    
  526. 
    
  527.         self.assertNumQueries(expected_num_queries, s.delete)
    
  528.         self.assertFalse(S.objects.exists())
    
  529.         self.assertFalse(T.objects.exists())
    
  530. 
    
  531.     def test_delete_with_keeping_parents(self):
    
  532.         child = RChild.objects.create()
    
  533.         parent_id = child.r_ptr_id
    
  534.         child.delete(keep_parents=True)
    
  535.         self.assertFalse(RChild.objects.filter(id=child.id).exists())
    
  536.         self.assertTrue(R.objects.filter(id=parent_id).exists())
    
  537. 
    
  538.     def test_delete_with_keeping_parents_relationships(self):
    
  539.         child = RChild.objects.create()
    
  540.         parent_id = child.r_ptr_id
    
  541.         parent_referent_id = S.objects.create(r=child.r_ptr).pk
    
  542.         child.delete(keep_parents=True)
    
  543.         self.assertFalse(RChild.objects.filter(id=child.id).exists())
    
  544.         self.assertTrue(R.objects.filter(id=parent_id).exists())
    
  545.         self.assertTrue(S.objects.filter(pk=parent_referent_id).exists())
    
  546. 
    
  547.         childchild = RChildChild.objects.create()
    
  548.         parent_id = childchild.rchild_ptr.r_ptr_id
    
  549.         child_id = childchild.rchild_ptr_id
    
  550.         parent_referent_id = S.objects.create(r=childchild.rchild_ptr.r_ptr).pk
    
  551.         childchild.delete(keep_parents=True)
    
  552.         self.assertFalse(RChildChild.objects.filter(id=childchild.id).exists())
    
  553.         self.assertTrue(RChild.objects.filter(id=child_id).exists())
    
  554.         self.assertTrue(R.objects.filter(id=parent_id).exists())
    
  555.         self.assertTrue(S.objects.filter(pk=parent_referent_id).exists())
    
  556. 
    
  557.     def test_queryset_delete_returns_num_rows(self):
    
  558.         """
    
  559.         QuerySet.delete() should return the number of deleted rows and a
    
  560.         dictionary with the number of deletions for each object type.
    
  561.         """
    
  562.         Avatar.objects.bulk_create(
    
  563.             [Avatar(desc="a"), Avatar(desc="b"), Avatar(desc="c")]
    
  564.         )
    
  565.         avatars_count = Avatar.objects.count()
    
  566.         deleted, rows_count = Avatar.objects.all().delete()
    
  567.         self.assertEqual(deleted, avatars_count)
    
  568. 
    
  569.         # more complex example with multiple object types
    
  570.         r = R.objects.create()
    
  571.         h1 = HiddenUser.objects.create(r=r)
    
  572.         HiddenUser.objects.create(r=r)
    
  573.         HiddenUserProfile.objects.create(user=h1)
    
  574.         existed_objs = {
    
  575.             R._meta.label: R.objects.count(),
    
  576.             HiddenUser._meta.label: HiddenUser.objects.count(),
    
  577.             HiddenUserProfile._meta.label: HiddenUserProfile.objects.count(),
    
  578.         }
    
  579.         deleted, deleted_objs = R.objects.all().delete()
    
  580.         self.assertCountEqual(deleted_objs.keys(), existed_objs.keys())
    
  581.         for k, v in existed_objs.items():
    
  582.             self.assertEqual(deleted_objs[k], v)
    
  583. 
    
  584.     def test_model_delete_returns_num_rows(self):
    
  585.         """
    
  586.         Model.delete() should return the number of deleted rows and a
    
  587.         dictionary with the number of deletions for each object type.
    
  588.         """
    
  589.         r = R.objects.create()
    
  590.         h1 = HiddenUser.objects.create(r=r)
    
  591.         h2 = HiddenUser.objects.create(r=r)
    
  592.         HiddenUser.objects.create(r=r)
    
  593.         HiddenUserProfile.objects.create(user=h1)
    
  594.         HiddenUserProfile.objects.create(user=h2)
    
  595.         m1 = M.objects.create()
    
  596.         m2 = M.objects.create()
    
  597.         MR.objects.create(r=r, m=m1)
    
  598.         r.m_set.add(m1)
    
  599.         r.m_set.add(m2)
    
  600.         r.save()
    
  601.         existed_objs = {
    
  602.             R._meta.label: R.objects.count(),
    
  603.             HiddenUser._meta.label: HiddenUser.objects.count(),
    
  604.             MR._meta.label: MR.objects.count(),
    
  605.             HiddenUserProfile._meta.label: HiddenUserProfile.objects.count(),
    
  606.             M.m2m.through._meta.label: M.m2m.through.objects.count(),
    
  607.         }
    
  608.         deleted, deleted_objs = r.delete()
    
  609.         self.assertEqual(deleted, sum(existed_objs.values()))
    
  610.         self.assertCountEqual(deleted_objs.keys(), existed_objs.keys())
    
  611.         for k, v in existed_objs.items():
    
  612.             self.assertEqual(deleted_objs[k], v)
    
  613. 
    
  614.     def test_proxied_model_duplicate_queries(self):
    
  615.         """
    
  616.         #25685 - Deleting instances of a model with existing proxy
    
  617.         classes should not issue multiple queries during cascade
    
  618.         deletion of referring models.
    
  619.         """
    
  620.         avatar = Avatar.objects.create()
    
  621.         # One query for the Avatar table and a second for the User one.
    
  622.         with self.assertNumQueries(2):
    
  623.             avatar.delete()
    
  624. 
    
  625.     def test_only_referenced_fields_selected(self):
    
  626.         """
    
  627.         Only referenced fields are selected during cascade deletion SELECT
    
  628.         unless deletion signals are connected.
    
  629.         """
    
  630.         origin = Origin.objects.create()
    
  631.         expected_sql = str(
    
  632.             Referrer.objects.only(
    
  633.                 # Both fields are referenced by SecondReferrer.
    
  634.                 "id",
    
  635.                 "unique_field",
    
  636.             )
    
  637.             .filter(origin__in=[origin])
    
  638.             .query
    
  639.         )
    
  640.         with self.assertNumQueries(2) as ctx:
    
  641.             origin.delete()
    
  642.         self.assertEqual(ctx.captured_queries[0]["sql"], expected_sql)
    
  643. 
    
  644.         def receiver(instance, **kwargs):
    
  645.             pass
    
  646. 
    
  647.         # All fields are selected if deletion signals are connected.
    
  648.         for signal_name in ("pre_delete", "post_delete"):
    
  649.             with self.subTest(signal=signal_name):
    
  650.                 origin = Origin.objects.create()
    
  651.                 signal = getattr(models.signals, signal_name)
    
  652.                 signal.connect(receiver, sender=Referrer)
    
  653.                 with self.assertNumQueries(2) as ctx:
    
  654.                     origin.delete()
    
  655.                 self.assertIn(
    
  656.                     connection.ops.quote_name("large_field"),
    
  657.                     ctx.captured_queries[0]["sql"],
    
  658.                 )
    
  659.                 signal.disconnect(receiver, sender=Referrer)
    
  660. 
    
  661. 
    
  662. class FastDeleteTests(TestCase):
    
  663.     def test_fast_delete_all(self):
    
  664.         with self.assertNumQueries(1) as ctx:
    
  665.             User.objects.all().delete()
    
  666.         sql = ctx.captured_queries[0]["sql"]
    
  667.         # No subqueries is used when performing a full delete.
    
  668.         self.assertNotIn("SELECT", sql)
    
  669. 
    
  670.     def test_fast_delete_fk(self):
    
  671.         u = User.objects.create(avatar=Avatar.objects.create())
    
  672.         a = Avatar.objects.get(pk=u.avatar_id)
    
  673.         # 1 query to fast-delete the user
    
  674.         # 1 query to delete the avatar
    
  675.         self.assertNumQueries(2, a.delete)
    
  676.         self.assertFalse(User.objects.exists())
    
  677.         self.assertFalse(Avatar.objects.exists())
    
  678. 
    
  679.     def test_fast_delete_m2m(self):
    
  680.         t = M2MTo.objects.create()
    
  681.         f = M2MFrom.objects.create()
    
  682.         f.m2m.add(t)
    
  683.         # 1 to delete f, 1 to fast-delete m2m for f
    
  684.         self.assertNumQueries(2, f.delete)
    
  685. 
    
  686.     def test_fast_delete_revm2m(self):
    
  687.         t = M2MTo.objects.create()
    
  688.         f = M2MFrom.objects.create()
    
  689.         f.m2m.add(t)
    
  690.         # 1 to delete t, 1 to fast-delete t's m_set
    
  691.         self.assertNumQueries(2, f.delete)
    
  692. 
    
  693.     def test_fast_delete_qs(self):
    
  694.         u1 = User.objects.create()
    
  695.         u2 = User.objects.create()
    
  696.         self.assertNumQueries(1, User.objects.filter(pk=u1.pk).delete)
    
  697.         self.assertEqual(User.objects.count(), 1)
    
  698.         self.assertTrue(User.objects.filter(pk=u2.pk).exists())
    
  699. 
    
  700.     def test_fast_delete_instance_set_pk_none(self):
    
  701.         u = User.objects.create()
    
  702.         # User can be fast-deleted.
    
  703.         collector = Collector(using="default")
    
  704.         self.assertTrue(collector.can_fast_delete(u))
    
  705.         u.delete()
    
  706.         self.assertIsNone(u.pk)
    
  707. 
    
  708.     def test_fast_delete_joined_qs(self):
    
  709.         a = Avatar.objects.create(desc="a")
    
  710.         User.objects.create(avatar=a)
    
  711.         u2 = User.objects.create()
    
  712.         self.assertNumQueries(1, User.objects.filter(avatar__desc="a").delete)
    
  713.         self.assertEqual(User.objects.count(), 1)
    
  714.         self.assertTrue(User.objects.filter(pk=u2.pk).exists())
    
  715. 
    
  716.     def test_fast_delete_inheritance(self):
    
  717.         c = Child.objects.create()
    
  718.         p = Parent.objects.create()
    
  719.         # 1 for self, 1 for parent
    
  720.         self.assertNumQueries(2, c.delete)
    
  721.         self.assertFalse(Child.objects.exists())
    
  722.         self.assertEqual(Parent.objects.count(), 1)
    
  723.         self.assertEqual(Parent.objects.filter(pk=p.pk).count(), 1)
    
  724.         # 1 for self delete, 1 for fast delete of empty "child" qs.
    
  725.         self.assertNumQueries(2, p.delete)
    
  726.         self.assertFalse(Parent.objects.exists())
    
  727.         # 1 for self delete, 1 for fast delete of empty "child" qs.
    
  728.         c = Child.objects.create()
    
  729.         p = c.parent_ptr
    
  730.         self.assertNumQueries(2, p.delete)
    
  731.         self.assertFalse(Parent.objects.exists())
    
  732.         self.assertFalse(Child.objects.exists())
    
  733. 
    
  734.     def test_fast_delete_large_batch(self):
    
  735.         User.objects.bulk_create(User() for i in range(0, 2000))
    
  736.         # No problems here - we aren't going to cascade, so we will fast
    
  737.         # delete the objects in a single query.
    
  738.         self.assertNumQueries(1, User.objects.all().delete)
    
  739.         a = Avatar.objects.create(desc="a")
    
  740.         User.objects.bulk_create(User(avatar=a) for i in range(0, 2000))
    
  741.         # We don't hit parameter amount limits for a, so just one query for
    
  742.         # that + fast delete of the related objs.
    
  743.         self.assertNumQueries(2, a.delete)
    
  744.         self.assertEqual(User.objects.count(), 0)
    
  745. 
    
  746.     def test_fast_delete_empty_no_update_can_self_select(self):
    
  747.         """
    
  748.         Fast deleting when DatabaseFeatures.update_can_self_select = False
    
  749.         works even if the specified filter doesn't match any row (#25932).
    
  750.         """
    
  751.         with self.assertNumQueries(1):
    
  752.             self.assertEqual(
    
  753.                 User.objects.filter(avatar__desc="missing").delete(),
    
  754.                 (0, {}),
    
  755.             )
    
  756. 
    
  757.     def test_fast_delete_combined_relationships(self):
    
  758.         # The cascading fast-delete of SecondReferrer should be combined
    
  759.         # in a single DELETE WHERE referrer_id OR unique_field.
    
  760.         origin = Origin.objects.create()
    
  761.         referer = Referrer.objects.create(origin=origin, unique_field=42)
    
  762.         with self.assertNumQueries(2):
    
  763.             referer.delete()
    
  764. 
    
  765.     def test_fast_delete_aggregation(self):
    
  766.         # Fast-deleting when filtering against an aggregation result in
    
  767.         # a single query containing a subquery.
    
  768.         Base.objects.create()
    
  769.         with self.assertNumQueries(1):
    
  770.             self.assertEqual(
    
  771.                 Base.objects.annotate(
    
  772.                     rels_count=models.Count("rels"),
    
  773.                 )
    
  774.                 .filter(rels_count=0)
    
  775.                 .delete(),
    
  776.                 (1, {"delete.Base": 1}),
    
  777.             )
    
  778.         self.assertIs(Base.objects.exists(), False)