1. import gc
    
  2. import sys
    
  3. import weakref
    
  4. from types import TracebackType
    
  5. 
    
  6. from django.dispatch import Signal, receiver
    
  7. from django.test import SimpleTestCase
    
  8. from django.test.utils import override_settings
    
  9. 
    
  10. if hasattr(sys, "pypy_version_info"):
    
  11. 
    
  12.     def garbage_collect():
    
  13.         # Collecting weakreferences can take two collections on PyPy.
    
  14.         gc.collect()
    
  15.         gc.collect()
    
  16. 
    
  17. else:
    
  18. 
    
  19.     def garbage_collect():
    
  20.         gc.collect()
    
  21. 
    
  22. 
    
  23. def receiver_1_arg(val, **kwargs):
    
  24.     return val
    
  25. 
    
  26. 
    
  27. class Callable:
    
  28.     def __call__(self, val, **kwargs):
    
  29.         return val
    
  30. 
    
  31.     def a(self, val, **kwargs):
    
  32.         return val
    
  33. 
    
  34. 
    
  35. a_signal = Signal()
    
  36. b_signal = Signal()
    
  37. c_signal = Signal()
    
  38. d_signal = Signal(use_caching=True)
    
  39. 
    
  40. 
    
  41. class DispatcherTests(SimpleTestCase):
    
  42.     def assertTestIsClean(self, signal):
    
  43.         """Assert that everything has been cleaned up automatically"""
    
  44.         # Note that dead weakref cleanup happens as side effect of using
    
  45.         # the signal's receivers through the signals API. So, first do a
    
  46.         # call to an API method to force cleanup.
    
  47.         self.assertFalse(signal.has_listeners())
    
  48.         self.assertEqual(signal.receivers, [])
    
  49. 
    
  50.     @override_settings(DEBUG=True)
    
  51.     def test_cannot_connect_no_kwargs(self):
    
  52.         def receiver_no_kwargs(sender):
    
  53.             pass
    
  54. 
    
  55.         msg = "Signal receivers must accept keyword arguments (**kwargs)."
    
  56.         with self.assertRaisesMessage(ValueError, msg):
    
  57.             a_signal.connect(receiver_no_kwargs)
    
  58.         self.assertTestIsClean(a_signal)
    
  59. 
    
  60.     @override_settings(DEBUG=True)
    
  61.     def test_cannot_connect_non_callable(self):
    
  62.         msg = "Signal receivers must be callable."
    
  63.         with self.assertRaisesMessage(TypeError, msg):
    
  64.             a_signal.connect(object())
    
  65.         self.assertTestIsClean(a_signal)
    
  66. 
    
  67.     def test_send(self):
    
  68.         a_signal.connect(receiver_1_arg, sender=self)
    
  69.         result = a_signal.send(sender=self, val="test")
    
  70.         self.assertEqual(result, [(receiver_1_arg, "test")])
    
  71.         a_signal.disconnect(receiver_1_arg, sender=self)
    
  72.         self.assertTestIsClean(a_signal)
    
  73. 
    
  74.     def test_send_no_receivers(self):
    
  75.         result = a_signal.send(sender=self, val="test")
    
  76.         self.assertEqual(result, [])
    
  77. 
    
  78.     def test_send_connected_no_sender(self):
    
  79.         a_signal.connect(receiver_1_arg)
    
  80.         result = a_signal.send(sender=self, val="test")
    
  81.         self.assertEqual(result, [(receiver_1_arg, "test")])
    
  82.         a_signal.disconnect(receiver_1_arg)
    
  83.         self.assertTestIsClean(a_signal)
    
  84. 
    
  85.     def test_send_different_no_sender(self):
    
  86.         a_signal.connect(receiver_1_arg, sender=object)
    
  87.         result = a_signal.send(sender=self, val="test")
    
  88.         self.assertEqual(result, [])
    
  89.         a_signal.disconnect(receiver_1_arg, sender=object)
    
  90.         self.assertTestIsClean(a_signal)
    
  91. 
    
  92.     def test_garbage_collected(self):
    
  93.         a = Callable()
    
  94.         a_signal.connect(a.a, sender=self)
    
  95.         del a
    
  96.         garbage_collect()
    
  97.         result = a_signal.send(sender=self, val="test")
    
  98.         self.assertEqual(result, [])
    
  99.         self.assertTestIsClean(a_signal)
    
  100. 
    
  101.     def test_cached_garbaged_collected(self):
    
  102.         """
    
  103.         Make sure signal caching sender receivers don't prevent garbage
    
  104.         collection of senders.
    
  105.         """
    
  106. 
    
  107.         class sender:
    
  108.             pass
    
  109. 
    
  110.         wref = weakref.ref(sender)
    
  111.         d_signal.connect(receiver_1_arg)
    
  112.         d_signal.send(sender, val="garbage")
    
  113.         del sender
    
  114.         garbage_collect()
    
  115.         try:
    
  116.             self.assertIsNone(wref())
    
  117.         finally:
    
  118.             # Disconnect after reference check since it flushes the tested cache.
    
  119.             d_signal.disconnect(receiver_1_arg)
    
  120. 
    
  121.     def test_multiple_registration(self):
    
  122.         a = Callable()
    
  123.         a_signal.connect(a)
    
  124.         a_signal.connect(a)
    
  125.         a_signal.connect(a)
    
  126.         a_signal.connect(a)
    
  127.         a_signal.connect(a)
    
  128.         a_signal.connect(a)
    
  129.         result = a_signal.send(sender=self, val="test")
    
  130.         self.assertEqual(len(result), 1)
    
  131.         self.assertEqual(len(a_signal.receivers), 1)
    
  132.         del a
    
  133.         del result
    
  134.         garbage_collect()
    
  135.         self.assertTestIsClean(a_signal)
    
  136. 
    
  137.     def test_uid_registration(self):
    
  138.         def uid_based_receiver_1(**kwargs):
    
  139.             pass
    
  140. 
    
  141.         def uid_based_receiver_2(**kwargs):
    
  142.             pass
    
  143. 
    
  144.         a_signal.connect(uid_based_receiver_1, dispatch_uid="uid")
    
  145.         a_signal.connect(uid_based_receiver_2, dispatch_uid="uid")
    
  146.         self.assertEqual(len(a_signal.receivers), 1)
    
  147.         a_signal.disconnect(dispatch_uid="uid")
    
  148.         self.assertTestIsClean(a_signal)
    
  149. 
    
  150.     def test_send_robust_success(self):
    
  151.         a_signal.connect(receiver_1_arg)
    
  152.         result = a_signal.send_robust(sender=self, val="test")
    
  153.         self.assertEqual(result, [(receiver_1_arg, "test")])
    
  154.         a_signal.disconnect(receiver_1_arg)
    
  155.         self.assertTestIsClean(a_signal)
    
  156. 
    
  157.     def test_send_robust_no_receivers(self):
    
  158.         result = a_signal.send_robust(sender=self, val="test")
    
  159.         self.assertEqual(result, [])
    
  160. 
    
  161.     def test_send_robust_ignored_sender(self):
    
  162.         a_signal.connect(receiver_1_arg)
    
  163.         result = a_signal.send_robust(sender=self, val="test")
    
  164.         self.assertEqual(result, [(receiver_1_arg, "test")])
    
  165.         a_signal.disconnect(receiver_1_arg)
    
  166.         self.assertTestIsClean(a_signal)
    
  167. 
    
  168.     def test_send_robust_fail(self):
    
  169.         def fails(val, **kwargs):
    
  170.             raise ValueError("this")
    
  171. 
    
  172.         a_signal.connect(fails)
    
  173.         try:
    
  174.             with self.assertLogs("django.dispatch", "ERROR") as cm:
    
  175.                 result = a_signal.send_robust(sender=self, val="test")
    
  176.             err = result[0][1]
    
  177.             self.assertIsInstance(err, ValueError)
    
  178.             self.assertEqual(err.args, ("this",))
    
  179.             self.assertIs(hasattr(err, "__traceback__"), True)
    
  180.             self.assertIsInstance(err.__traceback__, TracebackType)
    
  181. 
    
  182.             log_record = cm.records[0]
    
  183.             self.assertEqual(
    
  184.                 log_record.getMessage(),
    
  185.                 "Error calling "
    
  186.                 "DispatcherTests.test_send_robust_fail.<locals>.fails in "
    
  187.                 "Signal.send_robust() (this)",
    
  188.             )
    
  189.             self.assertIsNotNone(log_record.exc_info)
    
  190.             _, exc_value, _ = log_record.exc_info
    
  191.             self.assertIsInstance(exc_value, ValueError)
    
  192.             self.assertEqual(str(exc_value), "this")
    
  193.         finally:
    
  194.             a_signal.disconnect(fails)
    
  195.         self.assertTestIsClean(a_signal)
    
  196. 
    
  197.     def test_disconnection(self):
    
  198.         receiver_1 = Callable()
    
  199.         receiver_2 = Callable()
    
  200.         receiver_3 = Callable()
    
  201.         a_signal.connect(receiver_1)
    
  202.         a_signal.connect(receiver_2)
    
  203.         a_signal.connect(receiver_3)
    
  204.         a_signal.disconnect(receiver_1)
    
  205.         del receiver_2
    
  206.         garbage_collect()
    
  207.         a_signal.disconnect(receiver_3)
    
  208.         self.assertTestIsClean(a_signal)
    
  209. 
    
  210.     def test_values_returned_by_disconnection(self):
    
  211.         receiver_1 = Callable()
    
  212.         receiver_2 = Callable()
    
  213.         a_signal.connect(receiver_1)
    
  214.         receiver_1_disconnected = a_signal.disconnect(receiver_1)
    
  215.         receiver_2_disconnected = a_signal.disconnect(receiver_2)
    
  216.         self.assertTrue(receiver_1_disconnected)
    
  217.         self.assertFalse(receiver_2_disconnected)
    
  218.         self.assertTestIsClean(a_signal)
    
  219. 
    
  220.     def test_has_listeners(self):
    
  221.         self.assertFalse(a_signal.has_listeners())
    
  222.         self.assertFalse(a_signal.has_listeners(sender=object()))
    
  223.         receiver_1 = Callable()
    
  224.         a_signal.connect(receiver_1)
    
  225.         self.assertTrue(a_signal.has_listeners())
    
  226.         self.assertTrue(a_signal.has_listeners(sender=object()))
    
  227.         a_signal.disconnect(receiver_1)
    
  228.         self.assertFalse(a_signal.has_listeners())
    
  229.         self.assertFalse(a_signal.has_listeners(sender=object()))
    
  230. 
    
  231. 
    
  232. class ReceiverTestCase(SimpleTestCase):
    
  233.     def test_receiver_single_signal(self):
    
  234.         @receiver(a_signal)
    
  235.         def f(val, **kwargs):
    
  236.             self.state = val
    
  237. 
    
  238.         self.state = False
    
  239.         a_signal.send(sender=self, val=True)
    
  240.         self.assertTrue(self.state)
    
  241. 
    
  242.     def test_receiver_signal_list(self):
    
  243.         @receiver([a_signal, b_signal, c_signal])
    
  244.         def f(val, **kwargs):
    
  245.             self.state.append(val)
    
  246. 
    
  247.         self.state = []
    
  248.         a_signal.send(sender=self, val="a")
    
  249.         c_signal.send(sender=self, val="c")
    
  250.         b_signal.send(sender=self, val="b")
    
  251.         self.assertIn("a", self.state)
    
  252.         self.assertIn("b", self.state)
    
  253.         self.assertIn("c", self.state)