1. from io import StringIO
    
  2. 
    
  3. from django.apps import apps
    
  4. from django.core import management
    
  5. from django.db import migrations
    
  6. from django.db.models import signals
    
  7. from django.test import TransactionTestCase, override_settings
    
  8. 
    
  9. APP_CONFIG = apps.get_app_config("migrate_signals")
    
  10. SIGNAL_ARGS = [
    
  11.     "app_config",
    
  12.     "verbosity",
    
  13.     "interactive",
    
  14.     "using",
    
  15.     "stdout",
    
  16.     "plan",
    
  17.     "apps",
    
  18. ]
    
  19. MIGRATE_DATABASE = "default"
    
  20. MIGRATE_VERBOSITY = 0
    
  21. MIGRATE_INTERACTIVE = False
    
  22. 
    
  23. 
    
  24. class Receiver:
    
  25.     def __init__(self, signal):
    
  26.         self.call_counter = 0
    
  27.         self.call_args = None
    
  28.         signal.connect(self, sender=APP_CONFIG)
    
  29. 
    
  30.     def __call__(self, signal, sender, **kwargs):
    
  31.         self.call_counter += 1
    
  32.         self.call_args = kwargs
    
  33. 
    
  34. 
    
  35. class OneTimeReceiver:
    
  36.     """
    
  37.     Special receiver for handle the fact that test runner calls migrate for
    
  38.     several databases and several times for some of them.
    
  39.     """
    
  40. 
    
  41.     def __init__(self, signal):
    
  42.         self.signal = signal
    
  43.         self.call_counter = 0
    
  44.         self.call_args = None
    
  45.         self.signal.connect(self, sender=APP_CONFIG)
    
  46. 
    
  47.     def __call__(self, signal, sender, **kwargs):
    
  48.         # Although test runner calls migrate for several databases,
    
  49.         # testing for only one of them is quite sufficient.
    
  50.         if kwargs["using"] == MIGRATE_DATABASE:
    
  51.             self.call_counter += 1
    
  52.             self.call_args = kwargs
    
  53.             # we need to test only one call of migrate
    
  54.             self.signal.disconnect(self, sender=APP_CONFIG)
    
  55. 
    
  56. 
    
  57. # We connect receiver here and not in unit test code because we need to
    
  58. # connect receiver before test runner creates database.  That is, sequence of
    
  59. # actions would be:
    
  60. #
    
  61. #   1. Test runner imports this module.
    
  62. #   2. We connect receiver.
    
  63. #   3. Test runner calls migrate for create default database.
    
  64. #   4. Test runner execute our unit test code.
    
  65. pre_migrate_receiver = OneTimeReceiver(signals.pre_migrate)
    
  66. post_migrate_receiver = OneTimeReceiver(signals.post_migrate)
    
  67. 
    
  68. 
    
  69. class MigrateSignalTests(TransactionTestCase):
    
  70.     available_apps = ["migrate_signals"]
    
  71. 
    
  72.     def test_call_time(self):
    
  73.         self.assertEqual(pre_migrate_receiver.call_counter, 1)
    
  74.         self.assertEqual(post_migrate_receiver.call_counter, 1)
    
  75. 
    
  76.     def test_args(self):
    
  77.         pre_migrate_receiver = Receiver(signals.pre_migrate)
    
  78.         post_migrate_receiver = Receiver(signals.post_migrate)
    
  79.         management.call_command(
    
  80.             "migrate",
    
  81.             database=MIGRATE_DATABASE,
    
  82.             verbosity=MIGRATE_VERBOSITY,
    
  83.             interactive=MIGRATE_INTERACTIVE,
    
  84.             stdout=StringIO("test_args"),
    
  85.         )
    
  86. 
    
  87.         for receiver in [pre_migrate_receiver, post_migrate_receiver]:
    
  88.             with self.subTest(receiver=receiver):
    
  89.                 args = receiver.call_args
    
  90.                 self.assertEqual(receiver.call_counter, 1)
    
  91.                 self.assertEqual(set(args), set(SIGNAL_ARGS))
    
  92.                 self.assertEqual(args["app_config"], APP_CONFIG)
    
  93.                 self.assertEqual(args["verbosity"], MIGRATE_VERBOSITY)
    
  94.                 self.assertEqual(args["interactive"], MIGRATE_INTERACTIVE)
    
  95.                 self.assertEqual(args["using"], "default")
    
  96.                 self.assertIn("test_args", args["stdout"].getvalue())
    
  97.                 self.assertEqual(args["plan"], [])
    
  98.                 self.assertIsInstance(args["apps"], migrations.state.StateApps)
    
  99. 
    
  100.     @override_settings(
    
  101.         MIGRATION_MODULES={"migrate_signals": "migrate_signals.custom_migrations"}
    
  102.     )
    
  103.     def test_migrations_only(self):
    
  104.         """
    
  105.         If all apps have migrations, migration signals should be sent.
    
  106.         """
    
  107.         pre_migrate_receiver = Receiver(signals.pre_migrate)
    
  108.         post_migrate_receiver = Receiver(signals.post_migrate)
    
  109.         management.call_command(
    
  110.             "migrate",
    
  111.             database=MIGRATE_DATABASE,
    
  112.             verbosity=MIGRATE_VERBOSITY,
    
  113.             interactive=MIGRATE_INTERACTIVE,
    
  114.         )
    
  115.         for receiver in [pre_migrate_receiver, post_migrate_receiver]:
    
  116.             args = receiver.call_args
    
  117.             self.assertEqual(receiver.call_counter, 1)
    
  118.             self.assertEqual(set(args), set(SIGNAL_ARGS))
    
  119.             self.assertEqual(args["app_config"], APP_CONFIG)
    
  120.             self.assertEqual(args["verbosity"], MIGRATE_VERBOSITY)
    
  121.             self.assertEqual(args["interactive"], MIGRATE_INTERACTIVE)
    
  122.             self.assertEqual(args["using"], "default")
    
  123.             self.assertIsInstance(args["plan"][0][0], migrations.Migration)
    
  124.             # The migration isn't applied backward.
    
  125.             self.assertFalse(args["plan"][0][1])
    
  126.             self.assertIsInstance(args["apps"], migrations.state.StateApps)
    
  127.         self.assertEqual(pre_migrate_receiver.call_args["apps"].get_models(), [])
    
  128.         self.assertEqual(
    
  129.             [
    
  130.                 model._meta.label
    
  131.                 for model in post_migrate_receiver.call_args["apps"].get_models()
    
  132.             ],
    
  133.             ["migrate_signals.Signal"],
    
  134.         )
    
  135.         # Migrating with an empty plan.
    
  136.         pre_migrate_receiver = Receiver(signals.pre_migrate)
    
  137.         post_migrate_receiver = Receiver(signals.post_migrate)
    
  138.         management.call_command(
    
  139.             "migrate",
    
  140.             database=MIGRATE_DATABASE,
    
  141.             verbosity=MIGRATE_VERBOSITY,
    
  142.             interactive=MIGRATE_INTERACTIVE,
    
  143.         )
    
  144.         self.assertEqual(
    
  145.             [
    
  146.                 model._meta.label
    
  147.                 for model in pre_migrate_receiver.call_args["apps"].get_models()
    
  148.             ],
    
  149.             ["migrate_signals.Signal"],
    
  150.         )
    
  151.         self.assertEqual(
    
  152.             [
    
  153.                 model._meta.label
    
  154.                 for model in post_migrate_receiver.call_args["apps"].get_models()
    
  155.             ],
    
  156.             ["migrate_signals.Signal"],
    
  157.         )