1. from unittest import mock
    
  2. 
    
  3. from django.apps.registry import Apps
    
  4. from django.db import models
    
  5. from django.db.models import signals
    
  6. from django.dispatch import receiver
    
  7. from django.test import SimpleTestCase, TestCase
    
  8. from django.test.utils import isolate_apps
    
  9. 
    
  10. from .models import Author, Book, Car, Page, Person
    
  11. 
    
  12. 
    
  13. class BaseSignalSetup:
    
  14.     def setUp(self):
    
  15.         # Save up the number of connected signals so that we can check at the
    
  16.         # end that all the signals we register get properly unregistered (#9989)
    
  17.         self.pre_signals = (
    
  18.             len(signals.pre_save.receivers),
    
  19.             len(signals.post_save.receivers),
    
  20.             len(signals.pre_delete.receivers),
    
  21.             len(signals.post_delete.receivers),
    
  22.         )
    
  23. 
    
  24.     def tearDown(self):
    
  25.         # All our signals got disconnected properly.
    
  26.         post_signals = (
    
  27.             len(signals.pre_save.receivers),
    
  28.             len(signals.post_save.receivers),
    
  29.             len(signals.pre_delete.receivers),
    
  30.             len(signals.post_delete.receivers),
    
  31.         )
    
  32.         self.assertEqual(self.pre_signals, post_signals)
    
  33. 
    
  34. 
    
  35. class SignalTests(BaseSignalSetup, TestCase):
    
  36.     def test_model_pre_init_and_post_init(self):
    
  37.         data = []
    
  38. 
    
  39.         def pre_init_callback(sender, args, **kwargs):
    
  40.             data.append(kwargs["kwargs"])
    
  41. 
    
  42.         signals.pre_init.connect(pre_init_callback)
    
  43. 
    
  44.         def post_init_callback(sender, instance, **kwargs):
    
  45.             data.append(instance)
    
  46. 
    
  47.         signals.post_init.connect(post_init_callback)
    
  48. 
    
  49.         p1 = Person(first_name="John", last_name="Doe")
    
  50.         self.assertEqual(data, [{}, p1])
    
  51. 
    
  52.     def test_save_signals(self):
    
  53.         data = []
    
  54. 
    
  55.         def pre_save_handler(signal, sender, instance, **kwargs):
    
  56.             data.append((instance, sender, kwargs.get("raw", False)))
    
  57. 
    
  58.         def post_save_handler(signal, sender, instance, **kwargs):
    
  59.             data.append(
    
  60.                 (instance, sender, kwargs.get("created"), kwargs.get("raw", False))
    
  61.             )
    
  62. 
    
  63.         signals.pre_save.connect(pre_save_handler, weak=False)
    
  64.         signals.post_save.connect(post_save_handler, weak=False)
    
  65.         try:
    
  66.             p1 = Person.objects.create(first_name="John", last_name="Smith")
    
  67. 
    
  68.             self.assertEqual(
    
  69.                 data,
    
  70.                 [
    
  71.                     (p1, Person, False),
    
  72.                     (p1, Person, True, False),
    
  73.                 ],
    
  74.             )
    
  75.             data[:] = []
    
  76. 
    
  77.             p1.first_name = "Tom"
    
  78.             p1.save()
    
  79.             self.assertEqual(
    
  80.                 data,
    
  81.                 [
    
  82.                     (p1, Person, False),
    
  83.                     (p1, Person, False, False),
    
  84.                 ],
    
  85.             )
    
  86.             data[:] = []
    
  87. 
    
  88.             # Calling an internal method purely so that we can trigger a "raw" save.
    
  89.             p1.save_base(raw=True)
    
  90.             self.assertEqual(
    
  91.                 data,
    
  92.                 [
    
  93.                     (p1, Person, True),
    
  94.                     (p1, Person, False, True),
    
  95.                 ],
    
  96.             )
    
  97.             data[:] = []
    
  98. 
    
  99.             p2 = Person(first_name="James", last_name="Jones")
    
  100.             p2.id = 99999
    
  101.             p2.save()
    
  102.             self.assertEqual(
    
  103.                 data,
    
  104.                 [
    
  105.                     (p2, Person, False),
    
  106.                     (p2, Person, True, False),
    
  107.                 ],
    
  108.             )
    
  109.             data[:] = []
    
  110.             p2.id = 99998
    
  111.             p2.save()
    
  112.             self.assertEqual(
    
  113.                 data,
    
  114.                 [
    
  115.                     (p2, Person, False),
    
  116.                     (p2, Person, True, False),
    
  117.                 ],
    
  118.             )
    
  119. 
    
  120.             # The sender should stay the same when using defer().
    
  121.             data[:] = []
    
  122.             p3 = Person.objects.defer("first_name").get(pk=p1.pk)
    
  123.             p3.last_name = "Reese"
    
  124.             p3.save()
    
  125.             self.assertEqual(
    
  126.                 data,
    
  127.                 [
    
  128.                     (p3, Person, False),
    
  129.                     (p3, Person, False, False),
    
  130.                 ],
    
  131.             )
    
  132.         finally:
    
  133.             signals.pre_save.disconnect(pre_save_handler)
    
  134.             signals.post_save.disconnect(post_save_handler)
    
  135. 
    
  136.     def test_delete_signals(self):
    
  137.         data = []
    
  138. 
    
  139.         def pre_delete_handler(signal, sender, instance, origin, **kwargs):
    
  140.             data.append((instance, sender, instance.id is None, origin))
    
  141. 
    
  142.         # #8285: signals can be any callable
    
  143.         class PostDeleteHandler:
    
  144.             def __init__(self, data):
    
  145.                 self.data = data
    
  146. 
    
  147.             def __call__(self, signal, sender, instance, origin, **kwargs):
    
  148.                 self.data.append((instance, sender, instance.id is None, origin))
    
  149. 
    
  150.         post_delete_handler = PostDeleteHandler(data)
    
  151. 
    
  152.         signals.pre_delete.connect(pre_delete_handler, weak=False)
    
  153.         signals.post_delete.connect(post_delete_handler, weak=False)
    
  154.         try:
    
  155.             p1 = Person.objects.create(first_name="John", last_name="Smith")
    
  156.             p1.delete()
    
  157.             self.assertEqual(
    
  158.                 data,
    
  159.                 [
    
  160.                     (p1, Person, False, p1),
    
  161.                     (p1, Person, False, p1),
    
  162.                 ],
    
  163.             )
    
  164.             data[:] = []
    
  165. 
    
  166.             p2 = Person(first_name="James", last_name="Jones")
    
  167.             p2.id = 99999
    
  168.             p2.save()
    
  169.             p2.id = 99998
    
  170.             p2.save()
    
  171.             p2.delete()
    
  172.             self.assertEqual(
    
  173.                 data,
    
  174.                 [
    
  175.                     (p2, Person, False, p2),
    
  176.                     (p2, Person, False, p2),
    
  177.                 ],
    
  178.             )
    
  179.             data[:] = []
    
  180. 
    
  181.             self.assertQuerysetEqual(
    
  182.                 Person.objects.all(),
    
  183.                 [
    
  184.                     "James Jones",
    
  185.                 ],
    
  186.                 str,
    
  187.             )
    
  188.         finally:
    
  189.             signals.pre_delete.disconnect(pre_delete_handler)
    
  190.             signals.post_delete.disconnect(post_delete_handler)
    
  191. 
    
  192.     def test_delete_signals_origin_model(self):
    
  193.         data = []
    
  194. 
    
  195.         def pre_delete_handler(signal, sender, instance, origin, **kwargs):
    
  196.             data.append((sender, origin))
    
  197. 
    
  198.         def post_delete_handler(signal, sender, instance, origin, **kwargs):
    
  199.             data.append((sender, origin))
    
  200. 
    
  201.         person = Person.objects.create(first_name="John", last_name="Smith")
    
  202.         book = Book.objects.create(name="Rayuela")
    
  203.         Page.objects.create(text="Page 1", book=book)
    
  204.         Page.objects.create(text="Page 2", book=book)
    
  205. 
    
  206.         signals.pre_delete.connect(pre_delete_handler, weak=False)
    
  207.         signals.post_delete.connect(post_delete_handler, weak=False)
    
  208.         try:
    
  209.             # Instance deletion.
    
  210.             person.delete()
    
  211.             self.assertEqual(data, [(Person, person), (Person, person)])
    
  212.             data[:] = []
    
  213.             # Cascade deletion.
    
  214.             book.delete()
    
  215.             self.assertEqual(
    
  216.                 data,
    
  217.                 [
    
  218.                     (Page, book),
    
  219.                     (Page, book),
    
  220.                     (Book, book),
    
  221.                     (Page, book),
    
  222.                     (Page, book),
    
  223.                     (Book, book),
    
  224.                 ],
    
  225.             )
    
  226.         finally:
    
  227.             signals.pre_delete.disconnect(pre_delete_handler)
    
  228.             signals.post_delete.disconnect(post_delete_handler)
    
  229. 
    
  230.     def test_delete_signals_origin_queryset(self):
    
  231.         data = []
    
  232. 
    
  233.         def pre_delete_handler(signal, sender, instance, origin, **kwargs):
    
  234.             data.append((sender, origin))
    
  235. 
    
  236.         def post_delete_handler(signal, sender, instance, origin, **kwargs):
    
  237.             data.append((sender, origin))
    
  238. 
    
  239.         Person.objects.create(first_name="John", last_name="Smith")
    
  240.         book = Book.objects.create(name="Rayuela")
    
  241.         Page.objects.create(text="Page 1", book=book)
    
  242.         Page.objects.create(text="Page 2", book=book)
    
  243. 
    
  244.         signals.pre_delete.connect(pre_delete_handler, weak=False)
    
  245.         signals.post_delete.connect(post_delete_handler, weak=False)
    
  246.         try:
    
  247.             # Queryset deletion.
    
  248.             qs = Person.objects.all()
    
  249.             qs.delete()
    
  250.             self.assertEqual(data, [(Person, qs), (Person, qs)])
    
  251.             data[:] = []
    
  252.             # Cascade deletion.
    
  253.             qs = Book.objects.all()
    
  254.             qs.delete()
    
  255.             self.assertEqual(
    
  256.                 data,
    
  257.                 [
    
  258.                     (Page, qs),
    
  259.                     (Page, qs),
    
  260.                     (Book, qs),
    
  261.                     (Page, qs),
    
  262.                     (Page, qs),
    
  263.                     (Book, qs),
    
  264.                 ],
    
  265.             )
    
  266.         finally:
    
  267.             signals.pre_delete.disconnect(pre_delete_handler)
    
  268.             signals.post_delete.disconnect(post_delete_handler)
    
  269. 
    
  270.     def test_decorators(self):
    
  271.         data = []
    
  272. 
    
  273.         @receiver(signals.pre_save, weak=False)
    
  274.         def decorated_handler(signal, sender, instance, **kwargs):
    
  275.             data.append(instance)
    
  276. 
    
  277.         @receiver(signals.pre_save, sender=Car, weak=False)
    
  278.         def decorated_handler_with_sender_arg(signal, sender, instance, **kwargs):
    
  279.             data.append(instance)
    
  280. 
    
  281.         try:
    
  282.             c1 = Car.objects.create(make="Volkswagen", model="Passat")
    
  283.             self.assertEqual(data, [c1, c1])
    
  284.         finally:
    
  285.             signals.pre_save.disconnect(decorated_handler)
    
  286.             signals.pre_save.disconnect(decorated_handler_with_sender_arg, sender=Car)
    
  287. 
    
  288.     def test_save_and_delete_signals_with_m2m(self):
    
  289.         data = []
    
  290. 
    
  291.         def pre_save_handler(signal, sender, instance, **kwargs):
    
  292.             data.append("pre_save signal, %s" % instance)
    
  293.             if kwargs.get("raw"):
    
  294.                 data.append("Is raw")
    
  295. 
    
  296.         def post_save_handler(signal, sender, instance, **kwargs):
    
  297.             data.append("post_save signal, %s" % instance)
    
  298.             if "created" in kwargs:
    
  299.                 if kwargs["created"]:
    
  300.                     data.append("Is created")
    
  301.                 else:
    
  302.                     data.append("Is updated")
    
  303.             if kwargs.get("raw"):
    
  304.                 data.append("Is raw")
    
  305. 
    
  306.         def pre_delete_handler(signal, sender, instance, **kwargs):
    
  307.             data.append("pre_delete signal, %s" % instance)
    
  308.             data.append("instance.id is not None: %s" % (instance.id is not None))
    
  309. 
    
  310.         def post_delete_handler(signal, sender, instance, **kwargs):
    
  311.             data.append("post_delete signal, %s" % instance)
    
  312.             data.append("instance.id is not None: %s" % (instance.id is not None))
    
  313. 
    
  314.         signals.pre_save.connect(pre_save_handler, weak=False)
    
  315.         signals.post_save.connect(post_save_handler, weak=False)
    
  316.         signals.pre_delete.connect(pre_delete_handler, weak=False)
    
  317.         signals.post_delete.connect(post_delete_handler, weak=False)
    
  318.         try:
    
  319.             a1 = Author.objects.create(name="Neal Stephenson")
    
  320.             self.assertEqual(
    
  321.                 data,
    
  322.                 [
    
  323.                     "pre_save signal, Neal Stephenson",
    
  324.                     "post_save signal, Neal Stephenson",
    
  325.                     "Is created",
    
  326.                 ],
    
  327.             )
    
  328.             data[:] = []
    
  329. 
    
  330.             b1 = Book.objects.create(name="Snow Crash")
    
  331.             self.assertEqual(
    
  332.                 data,
    
  333.                 [
    
  334.                     "pre_save signal, Snow Crash",
    
  335.                     "post_save signal, Snow Crash",
    
  336.                     "Is created",
    
  337.                 ],
    
  338.             )
    
  339.             data[:] = []
    
  340. 
    
  341.             # Assigning and removing to/from m2m shouldn't generate an m2m signal.
    
  342.             b1.authors.set([a1])
    
  343.             self.assertEqual(data, [])
    
  344.             b1.authors.set([])
    
  345.             self.assertEqual(data, [])
    
  346.         finally:
    
  347.             signals.pre_save.disconnect(pre_save_handler)
    
  348.             signals.post_save.disconnect(post_save_handler)
    
  349.             signals.pre_delete.disconnect(pre_delete_handler)
    
  350.             signals.post_delete.disconnect(post_delete_handler)
    
  351. 
    
  352.     def test_disconnect_in_dispatch(self):
    
  353.         """
    
  354.         Signals that disconnect when being called don't mess future
    
  355.         dispatching.
    
  356.         """
    
  357. 
    
  358.         class Handler:
    
  359.             def __init__(self, param):
    
  360.                 self.param = param
    
  361.                 self._run = False
    
  362. 
    
  363.             def __call__(self, signal, sender, **kwargs):
    
  364.                 self._run = True
    
  365.                 signal.disconnect(receiver=self, sender=sender)
    
  366. 
    
  367.         a, b = Handler(1), Handler(2)
    
  368.         signals.post_save.connect(a, sender=Person, weak=False)
    
  369.         signals.post_save.connect(b, sender=Person, weak=False)
    
  370.         Person.objects.create(first_name="John", last_name="Smith")
    
  371. 
    
  372.         self.assertTrue(a._run)
    
  373.         self.assertTrue(b._run)
    
  374.         self.assertEqual(signals.post_save.receivers, [])
    
  375. 
    
  376.     @mock.patch("weakref.ref")
    
  377.     def test_lazy_model_signal(self, ref):
    
  378.         def callback(sender, args, **kwargs):
    
  379.             pass
    
  380. 
    
  381.         signals.pre_init.connect(callback)
    
  382.         signals.pre_init.disconnect(callback)
    
  383.         self.assertTrue(ref.called)
    
  384.         ref.reset_mock()
    
  385. 
    
  386.         signals.pre_init.connect(callback, weak=False)
    
  387.         signals.pre_init.disconnect(callback)
    
  388.         ref.assert_not_called()
    
  389. 
    
  390.     @isolate_apps("signals", kwarg_name="apps")
    
  391.     def test_disconnect_model(self, apps):
    
  392.         received = []
    
  393. 
    
  394.         def receiver(**kwargs):
    
  395.             received.append(kwargs)
    
  396. 
    
  397.         class Created(models.Model):
    
  398.             pass
    
  399. 
    
  400.         signals.post_init.connect(receiver, sender=Created, apps=apps)
    
  401.         try:
    
  402.             self.assertIs(
    
  403.                 signals.post_init.disconnect(receiver, sender=Created, apps=apps),
    
  404.                 True,
    
  405.             )
    
  406.             self.assertIs(
    
  407.                 signals.post_init.disconnect(receiver, sender=Created, apps=apps),
    
  408.                 False,
    
  409.             )
    
  410.             Created()
    
  411.             self.assertEqual(received, [])
    
  412.         finally:
    
  413.             signals.post_init.disconnect(receiver, sender=Created)
    
  414. 
    
  415. 
    
  416. class LazyModelRefTests(BaseSignalSetup, SimpleTestCase):
    
  417.     def setUp(self):
    
  418.         super().setUp()
    
  419.         self.received = []
    
  420. 
    
  421.     def receiver(self, **kwargs):
    
  422.         self.received.append(kwargs)
    
  423. 
    
  424.     def test_invalid_sender_model_name(self):
    
  425.         msg = (
    
  426.             "Invalid model reference 'invalid'. String model references must be of the "
    
  427.             "form 'app_label.ModelName'."
    
  428.         )
    
  429.         with self.assertRaisesMessage(ValueError, msg):
    
  430.             signals.post_init.connect(self.receiver, sender="invalid")
    
  431. 
    
  432.     def test_already_loaded_model(self):
    
  433.         signals.post_init.connect(self.receiver, sender="signals.Book", weak=False)
    
  434.         try:
    
  435.             instance = Book()
    
  436.             self.assertEqual(
    
  437.                 self.received,
    
  438.                 [{"signal": signals.post_init, "sender": Book, "instance": instance}],
    
  439.             )
    
  440.         finally:
    
  441.             signals.post_init.disconnect(self.receiver, sender=Book)
    
  442. 
    
  443.     @isolate_apps("signals", kwarg_name="apps")
    
  444.     def test_not_loaded_model(self, apps):
    
  445.         signals.post_init.connect(
    
  446.             self.receiver, sender="signals.Created", weak=False, apps=apps
    
  447.         )
    
  448. 
    
  449.         try:
    
  450. 
    
  451.             class Created(models.Model):
    
  452.                 pass
    
  453. 
    
  454.             instance = Created()
    
  455.             self.assertEqual(
    
  456.                 self.received,
    
  457.                 [
    
  458.                     {
    
  459.                         "signal": signals.post_init,
    
  460.                         "sender": Created,
    
  461.                         "instance": instance,
    
  462.                     }
    
  463.                 ],
    
  464.             )
    
  465.         finally:
    
  466.             signals.post_init.disconnect(self.receiver, sender=Created)
    
  467. 
    
  468.     @isolate_apps("signals", kwarg_name="apps")
    
  469.     def test_disconnect_registered_model(self, apps):
    
  470.         received = []
    
  471. 
    
  472.         def receiver(**kwargs):
    
  473.             received.append(kwargs)
    
  474. 
    
  475.         class Created(models.Model):
    
  476.             pass
    
  477. 
    
  478.         signals.post_init.connect(receiver, sender="signals.Created", apps=apps)
    
  479.         try:
    
  480.             self.assertIsNone(
    
  481.                 signals.post_init.disconnect(
    
  482.                     receiver, sender="signals.Created", apps=apps
    
  483.                 )
    
  484.             )
    
  485.             self.assertIsNone(
    
  486.                 signals.post_init.disconnect(
    
  487.                     receiver, sender="signals.Created", apps=apps
    
  488.                 )
    
  489.             )
    
  490.             Created()
    
  491.             self.assertEqual(received, [])
    
  492.         finally:
    
  493.             signals.post_init.disconnect(receiver, sender="signals.Created")
    
  494. 
    
  495.     @isolate_apps("signals", kwarg_name="apps")
    
  496.     def test_disconnect_unregistered_model(self, apps):
    
  497.         received = []
    
  498. 
    
  499.         def receiver(**kwargs):
    
  500.             received.append(kwargs)
    
  501. 
    
  502.         signals.post_init.connect(receiver, sender="signals.Created", apps=apps)
    
  503.         try:
    
  504.             self.assertIsNone(
    
  505.                 signals.post_init.disconnect(
    
  506.                     receiver, sender="signals.Created", apps=apps
    
  507.                 )
    
  508.             )
    
  509.             self.assertIsNone(
    
  510.                 signals.post_init.disconnect(
    
  511.                     receiver, sender="signals.Created", apps=apps
    
  512.                 )
    
  513.             )
    
  514. 
    
  515.             class Created(models.Model):
    
  516.                 pass
    
  517. 
    
  518.             Created()
    
  519.             self.assertEqual(received, [])
    
  520.         finally:
    
  521.             signals.post_init.disconnect(receiver, sender="signals.Created")
    
  522. 
    
  523.     def test_register_model_class_senders_immediately(self):
    
  524.         """
    
  525.         Model signals registered with model classes as senders don't use the
    
  526.         Apps.lazy_model_operation() mechanism.
    
  527.         """
    
  528.         # Book isn't registered with apps2, so it will linger in
    
  529.         # apps2._pending_operations if ModelSignal does the wrong thing.
    
  530.         apps2 = Apps()
    
  531.         signals.post_init.connect(self.receiver, sender=Book, apps=apps2)
    
  532.         self.assertEqual(list(apps2._pending_operations), [])