1. import compileall
    
  2. import os
    
  3. from importlib import import_module
    
  4. 
    
  5. from django.db import connection, connections
    
  6. from django.db.migrations.exceptions import (
    
  7.     AmbiguityError,
    
  8.     InconsistentMigrationHistory,
    
  9.     NodeNotFoundError,
    
  10. )
    
  11. from django.db.migrations.loader import MigrationLoader
    
  12. from django.db.migrations.recorder import MigrationRecorder
    
  13. from django.test import TestCase, modify_settings, override_settings
    
  14. 
    
  15. from .test_base import MigrationTestBase
    
  16. 
    
  17. 
    
  18. class RecorderTests(TestCase):
    
  19.     """
    
  20.     Tests recording migrations as applied or not.
    
  21.     """
    
  22. 
    
  23.     databases = {"default", "other"}
    
  24. 
    
  25.     def test_apply(self):
    
  26.         """
    
  27.         Tests marking migrations as applied/unapplied.
    
  28.         """
    
  29.         recorder = MigrationRecorder(connection)
    
  30.         self.assertEqual(
    
  31.             {(x, y) for (x, y) in recorder.applied_migrations() if x == "myapp"},
    
  32.             set(),
    
  33.         )
    
  34.         recorder.record_applied("myapp", "0432_ponies")
    
  35.         self.assertEqual(
    
  36.             {(x, y) for (x, y) in recorder.applied_migrations() if x == "myapp"},
    
  37.             {("myapp", "0432_ponies")},
    
  38.         )
    
  39.         # That should not affect records of another database
    
  40.         recorder_other = MigrationRecorder(connections["other"])
    
  41.         self.assertEqual(
    
  42.             {(x, y) for (x, y) in recorder_other.applied_migrations() if x == "myapp"},
    
  43.             set(),
    
  44.         )
    
  45.         recorder.record_unapplied("myapp", "0432_ponies")
    
  46.         self.assertEqual(
    
  47.             {(x, y) for (x, y) in recorder.applied_migrations() if x == "myapp"},
    
  48.             set(),
    
  49.         )
    
  50. 
    
  51. 
    
  52. class LoaderTests(TestCase):
    
  53.     """
    
  54.     Tests the disk and database loader, and running through migrations
    
  55.     in memory.
    
  56.     """
    
  57. 
    
  58.     def setUp(self):
    
  59.         self.applied_records = []
    
  60. 
    
  61.     def tearDown(self):
    
  62.         # Unapply records on databases that don't roll back changes after each
    
  63.         # test method.
    
  64.         if not connection.features.supports_transactions:
    
  65.             for recorder, app, name in self.applied_records:
    
  66.                 recorder.record_unapplied(app, name)
    
  67. 
    
  68.     def record_applied(self, recorder, app, name):
    
  69.         recorder.record_applied(app, name)
    
  70.         self.applied_records.append((recorder, app, name))
    
  71. 
    
  72.     @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
    
  73.     @modify_settings(INSTALLED_APPS={"append": "basic"})
    
  74.     def test_load(self):
    
  75.         """
    
  76.         Makes sure the loader can load the migrations for the test apps,
    
  77.         and then render them out to a new Apps.
    
  78.         """
    
  79.         # Load and test the plan
    
  80.         migration_loader = MigrationLoader(connection)
    
  81.         self.assertEqual(
    
  82.             migration_loader.graph.forwards_plan(("migrations", "0002_second")),
    
  83.             [
    
  84.                 ("migrations", "0001_initial"),
    
  85.                 ("migrations", "0002_second"),
    
  86.             ],
    
  87.         )
    
  88.         # Now render it out!
    
  89.         project_state = migration_loader.project_state(("migrations", "0002_second"))
    
  90.         self.assertEqual(len(project_state.models), 2)
    
  91. 
    
  92.         author_state = project_state.models["migrations", "author"]
    
  93.         self.assertEqual(
    
  94.             list(author_state.fields), ["id", "name", "slug", "age", "rating"]
    
  95.         )
    
  96. 
    
  97.         book_state = project_state.models["migrations", "book"]
    
  98.         self.assertEqual(list(book_state.fields), ["id", "author"])
    
  99. 
    
  100.         # Ensure we've included unmigrated apps in there too
    
  101.         self.assertIn("basic", project_state.real_apps)
    
  102. 
    
  103.     @override_settings(
    
  104.         MIGRATION_MODULES={
    
  105.             "migrations": "migrations.test_migrations",
    
  106.             "migrations2": "migrations2.test_migrations_2",
    
  107.         }
    
  108.     )
    
  109.     @modify_settings(INSTALLED_APPS={"append": "migrations2"})
    
  110.     def test_plan_handles_repeated_migrations(self):
    
  111.         """
    
  112.         _generate_plan() doesn't readd migrations already in the plan (#29180).
    
  113.         """
    
  114.         migration_loader = MigrationLoader(connection)
    
  115.         nodes = [("migrations", "0002_second"), ("migrations2", "0001_initial")]
    
  116.         self.assertEqual(
    
  117.             migration_loader.graph._generate_plan(nodes, at_end=True),
    
  118.             [
    
  119.                 ("migrations", "0001_initial"),
    
  120.                 ("migrations", "0002_second"),
    
  121.                 ("migrations2", "0001_initial"),
    
  122.             ],
    
  123.         )
    
  124. 
    
  125.     @override_settings(
    
  126.         MIGRATION_MODULES={"migrations": "migrations.test_migrations_unmigdep"}
    
  127.     )
    
  128.     def test_load_unmigrated_dependency(self):
    
  129.         """
    
  130.         The loader can load migrations with a dependency on an unmigrated app.
    
  131.         """
    
  132.         # Load and test the plan
    
  133.         migration_loader = MigrationLoader(connection)
    
  134.         self.assertEqual(
    
  135.             migration_loader.graph.forwards_plan(("migrations", "0001_initial")),
    
  136.             [
    
  137.                 ("contenttypes", "0001_initial"),
    
  138.                 ("auth", "0001_initial"),
    
  139.                 ("migrations", "0001_initial"),
    
  140.             ],
    
  141.         )
    
  142.         # Now render it out!
    
  143.         project_state = migration_loader.project_state(("migrations", "0001_initial"))
    
  144.         self.assertEqual(
    
  145.             len([m for a, m in project_state.models if a == "migrations"]), 1
    
  146.         )
    
  147. 
    
  148.         book_state = project_state.models["migrations", "book"]
    
  149.         self.assertEqual(list(book_state.fields), ["id", "user"])
    
  150. 
    
  151.     @override_settings(
    
  152.         MIGRATION_MODULES={"migrations": "migrations.test_migrations_run_before"}
    
  153.     )
    
  154.     def test_run_before(self):
    
  155.         """
    
  156.         Makes sure the loader uses Migration.run_before.
    
  157.         """
    
  158.         # Load and test the plan
    
  159.         migration_loader = MigrationLoader(connection)
    
  160.         self.assertEqual(
    
  161.             migration_loader.graph.forwards_plan(("migrations", "0002_second")),
    
  162.             [
    
  163.                 ("migrations", "0001_initial"),
    
  164.                 ("migrations", "0003_third"),
    
  165.                 ("migrations", "0002_second"),
    
  166.             ],
    
  167.         )
    
  168. 
    
  169.     @override_settings(
    
  170.         MIGRATION_MODULES={
    
  171.             "migrations": "migrations.test_migrations_first",
    
  172.             "migrations2": "migrations2.test_migrations_2_first",
    
  173.         }
    
  174.     )
    
  175.     @modify_settings(INSTALLED_APPS={"append": "migrations2"})
    
  176.     def test_first(self):
    
  177.         """
    
  178.         Makes sure the '__first__' migrations build correctly.
    
  179.         """
    
  180.         migration_loader = MigrationLoader(connection)
    
  181.         self.assertEqual(
    
  182.             migration_loader.graph.forwards_plan(("migrations", "second")),
    
  183.             [
    
  184.                 ("migrations", "thefirst"),
    
  185.                 ("migrations2", "0001_initial"),
    
  186.                 ("migrations2", "0002_second"),
    
  187.                 ("migrations", "second"),
    
  188.             ],
    
  189.         )
    
  190. 
    
  191.     @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
    
  192.     def test_name_match(self):
    
  193.         "Tests prefix name matching"
    
  194.         migration_loader = MigrationLoader(connection)
    
  195.         self.assertEqual(
    
  196.             migration_loader.get_migration_by_prefix("migrations", "0001").name,
    
  197.             "0001_initial",
    
  198.         )
    
  199.         msg = "There is more than one migration for 'migrations' with the prefix '0'"
    
  200.         with self.assertRaisesMessage(AmbiguityError, msg):
    
  201.             migration_loader.get_migration_by_prefix("migrations", "0")
    
  202.         msg = "There is no migration for 'migrations' with the prefix 'blarg'"
    
  203.         with self.assertRaisesMessage(KeyError, msg):
    
  204.             migration_loader.get_migration_by_prefix("migrations", "blarg")
    
  205. 
    
  206.     def test_load_import_error(self):
    
  207.         with override_settings(
    
  208.             MIGRATION_MODULES={"migrations": "import_error_package"}
    
  209.         ):
    
  210.             with self.assertRaises(ImportError):
    
  211.                 MigrationLoader(connection)
    
  212. 
    
  213.     def test_load_module_file(self):
    
  214.         with override_settings(
    
  215.             MIGRATION_MODULES={"migrations": "migrations.faulty_migrations.file"}
    
  216.         ):
    
  217.             loader = MigrationLoader(connection)
    
  218.             self.assertIn(
    
  219.                 "migrations",
    
  220.                 loader.unmigrated_apps,
    
  221.                 "App with migrations module file not in unmigrated apps.",
    
  222.             )
    
  223. 
    
  224.     def test_load_empty_dir(self):
    
  225.         with override_settings(
    
  226.             MIGRATION_MODULES={"migrations": "migrations.faulty_migrations.namespace"}
    
  227.         ):
    
  228.             loader = MigrationLoader(connection)
    
  229.             self.assertIn(
    
  230.                 "migrations",
    
  231.                 loader.unmigrated_apps,
    
  232.                 "App missing __init__.py in migrations module not in unmigrated apps.",
    
  233.             )
    
  234. 
    
  235.     @override_settings(
    
  236.         INSTALLED_APPS=["migrations.migrations_test_apps.migrated_app"],
    
  237.     )
    
  238.     def test_marked_as_migrated(self):
    
  239.         """
    
  240.         Undefined MIGRATION_MODULES implies default migration module.
    
  241.         """
    
  242.         migration_loader = MigrationLoader(connection)
    
  243.         self.assertEqual(migration_loader.migrated_apps, {"migrated_app"})
    
  244.         self.assertEqual(migration_loader.unmigrated_apps, set())
    
  245. 
    
  246.     @override_settings(
    
  247.         INSTALLED_APPS=["migrations.migrations_test_apps.migrated_app"],
    
  248.         MIGRATION_MODULES={"migrated_app": None},
    
  249.     )
    
  250.     def test_marked_as_unmigrated(self):
    
  251.         """
    
  252.         MIGRATION_MODULES allows disabling of migrations for a particular app.
    
  253.         """
    
  254.         migration_loader = MigrationLoader(connection)
    
  255.         self.assertEqual(migration_loader.migrated_apps, set())
    
  256.         self.assertEqual(migration_loader.unmigrated_apps, {"migrated_app"})
    
  257. 
    
  258.     @override_settings(
    
  259.         INSTALLED_APPS=["migrations.migrations_test_apps.migrated_app"],
    
  260.         MIGRATION_MODULES={"migrated_app": "missing-module"},
    
  261.     )
    
  262.     def test_explicit_missing_module(self):
    
  263.         """
    
  264.         If a MIGRATION_MODULES override points to a missing module, the error
    
  265.         raised during the importation attempt should be propagated unless
    
  266.         `ignore_no_migrations=True`.
    
  267.         """
    
  268.         with self.assertRaisesMessage(ImportError, "missing-module"):
    
  269.             migration_loader = MigrationLoader(connection)
    
  270.         migration_loader = MigrationLoader(connection, ignore_no_migrations=True)
    
  271.         self.assertEqual(migration_loader.migrated_apps, set())
    
  272.         self.assertEqual(migration_loader.unmigrated_apps, {"migrated_app"})
    
  273. 
    
  274.     @override_settings(
    
  275.         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed"}
    
  276.     )
    
  277.     def test_loading_squashed(self):
    
  278.         "Tests loading a squashed migration"
    
  279.         migration_loader = MigrationLoader(connection)
    
  280.         recorder = MigrationRecorder(connection)
    
  281.         self.addCleanup(recorder.flush)
    
  282.         # Loading with nothing applied should just give us the one node
    
  283.         self.assertEqual(
    
  284.             len([x for x in migration_loader.graph.nodes if x[0] == "migrations"]),
    
  285.             1,
    
  286.         )
    
  287.         # However, fake-apply one migration and it should now use the old two
    
  288.         self.record_applied(recorder, "migrations", "0001_initial")
    
  289.         migration_loader.build_graph()
    
  290.         self.assertEqual(
    
  291.             len([x for x in migration_loader.graph.nodes if x[0] == "migrations"]),
    
  292.             2,
    
  293.         )
    
  294. 
    
  295.     @override_settings(
    
  296.         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed_complex"}
    
  297.     )
    
  298.     def test_loading_squashed_complex(self):
    
  299.         "Tests loading a complex set of squashed migrations"
    
  300. 
    
  301.         loader = MigrationLoader(connection)
    
  302.         recorder = MigrationRecorder(connection)
    
  303.         self.addCleanup(recorder.flush)
    
  304. 
    
  305.         def num_nodes():
    
  306.             plan = set(loader.graph.forwards_plan(("migrations", "7_auto")))
    
  307.             return len(plan - loader.applied_migrations.keys())
    
  308. 
    
  309.         # Empty database: use squashed migration
    
  310.         loader.build_graph()
    
  311.         self.assertEqual(num_nodes(), 5)
    
  312. 
    
  313.         # Starting at 1 or 2 should use the squashed migration too
    
  314.         self.record_applied(recorder, "migrations", "1_auto")
    
  315.         loader.build_graph()
    
  316.         self.assertEqual(num_nodes(), 4)
    
  317. 
    
  318.         self.record_applied(recorder, "migrations", "2_auto")
    
  319.         loader.build_graph()
    
  320.         self.assertEqual(num_nodes(), 3)
    
  321. 
    
  322.         # However, starting at 3 to 5 cannot use the squashed migration
    
  323.         self.record_applied(recorder, "migrations", "3_auto")
    
  324.         loader.build_graph()
    
  325.         self.assertEqual(num_nodes(), 4)
    
  326. 
    
  327.         self.record_applied(recorder, "migrations", "4_auto")
    
  328.         loader.build_graph()
    
  329.         self.assertEqual(num_nodes(), 3)
    
  330. 
    
  331.         # Starting at 5 to 7 we are past the squashed migrations.
    
  332.         self.record_applied(recorder, "migrations", "5_auto")
    
  333.         loader.build_graph()
    
  334.         self.assertEqual(num_nodes(), 2)
    
  335. 
    
  336.         self.record_applied(recorder, "migrations", "6_auto")
    
  337.         loader.build_graph()
    
  338.         self.assertEqual(num_nodes(), 1)
    
  339. 
    
  340.         self.record_applied(recorder, "migrations", "7_auto")
    
  341.         loader.build_graph()
    
  342.         self.assertEqual(num_nodes(), 0)
    
  343. 
    
  344.     @override_settings(
    
  345.         MIGRATION_MODULES={
    
  346.             "app1": "migrations.test_migrations_squashed_complex_multi_apps.app1",
    
  347.             "app2": "migrations.test_migrations_squashed_complex_multi_apps.app2",
    
  348.         }
    
  349.     )
    
  350.     @modify_settings(
    
  351.         INSTALLED_APPS={
    
  352.             "append": [
    
  353.                 "migrations.test_migrations_squashed_complex_multi_apps.app1",
    
  354.                 "migrations.test_migrations_squashed_complex_multi_apps.app2",
    
  355.             ]
    
  356.         }
    
  357.     )
    
  358.     def test_loading_squashed_complex_multi_apps(self):
    
  359.         loader = MigrationLoader(connection)
    
  360.         loader.build_graph()
    
  361. 
    
  362.         plan = set(loader.graph.forwards_plan(("app1", "4_auto")))
    
  363.         expected_plan = {
    
  364.             ("app1", "1_auto"),
    
  365.             ("app2", "1_squashed_2"),
    
  366.             ("app1", "2_squashed_3"),
    
  367.             ("app1", "4_auto"),
    
  368.         }
    
  369.         self.assertEqual(plan, expected_plan)
    
  370. 
    
  371.     @override_settings(
    
  372.         MIGRATION_MODULES={
    
  373.             "app1": "migrations.test_migrations_squashed_complex_multi_apps.app1",
    
  374.             "app2": "migrations.test_migrations_squashed_complex_multi_apps.app2",
    
  375.         }
    
  376.     )
    
  377.     @modify_settings(
    
  378.         INSTALLED_APPS={
    
  379.             "append": [
    
  380.                 "migrations.test_migrations_squashed_complex_multi_apps.app1",
    
  381.                 "migrations.test_migrations_squashed_complex_multi_apps.app2",
    
  382.             ]
    
  383.         }
    
  384.     )
    
  385.     def test_loading_squashed_complex_multi_apps_partially_applied(self):
    
  386.         loader = MigrationLoader(connection)
    
  387.         recorder = MigrationRecorder(connection)
    
  388.         self.record_applied(recorder, "app1", "1_auto")
    
  389.         self.record_applied(recorder, "app1", "2_auto")
    
  390.         loader.build_graph()
    
  391. 
    
  392.         plan = set(loader.graph.forwards_plan(("app1", "4_auto")))
    
  393.         plan = plan - loader.applied_migrations.keys()
    
  394.         expected_plan = {
    
  395.             ("app2", "1_squashed_2"),
    
  396.             ("app1", "3_auto"),
    
  397.             ("app1", "4_auto"),
    
  398.         }
    
  399. 
    
  400.         self.assertEqual(plan, expected_plan)
    
  401. 
    
  402.     @override_settings(
    
  403.         MIGRATION_MODULES={
    
  404.             "migrations": "migrations.test_migrations_squashed_erroneous"
    
  405.         }
    
  406.     )
    
  407.     def test_loading_squashed_erroneous(self):
    
  408.         "Tests loading a complex but erroneous set of squashed migrations"
    
  409. 
    
  410.         loader = MigrationLoader(connection)
    
  411.         recorder = MigrationRecorder(connection)
    
  412.         self.addCleanup(recorder.flush)
    
  413. 
    
  414.         def num_nodes():
    
  415.             plan = set(loader.graph.forwards_plan(("migrations", "7_auto")))
    
  416.             return len(plan - loader.applied_migrations.keys())
    
  417. 
    
  418.         # Empty database: use squashed migration
    
  419.         loader.build_graph()
    
  420.         self.assertEqual(num_nodes(), 5)
    
  421. 
    
  422.         # Starting at 1 or 2 should use the squashed migration too
    
  423.         self.record_applied(recorder, "migrations", "1_auto")
    
  424.         loader.build_graph()
    
  425.         self.assertEqual(num_nodes(), 4)
    
  426. 
    
  427.         self.record_applied(recorder, "migrations", "2_auto")
    
  428.         loader.build_graph()
    
  429.         self.assertEqual(num_nodes(), 3)
    
  430. 
    
  431.         # However, starting at 3 or 4, nonexistent migrations would be needed.
    
  432.         msg = (
    
  433.             "Migration migrations.6_auto depends on nonexistent node "
    
  434.             "('migrations', '5_auto'). Django tried to replace migration "
    
  435.             "migrations.5_auto with any of [migrations.3_squashed_5] but wasn't able "
    
  436.             "to because some of the replaced migrations are already applied."
    
  437.         )
    
  438. 
    
  439.         self.record_applied(recorder, "migrations", "3_auto")
    
  440.         with self.assertRaisesMessage(NodeNotFoundError, msg):
    
  441.             loader.build_graph()
    
  442. 
    
  443.         self.record_applied(recorder, "migrations", "4_auto")
    
  444.         with self.assertRaisesMessage(NodeNotFoundError, msg):
    
  445.             loader.build_graph()
    
  446. 
    
  447.         # Starting at 5 to 7 we are passed the squashed migrations
    
  448.         self.record_applied(recorder, "migrations", "5_auto")
    
  449.         loader.build_graph()
    
  450.         self.assertEqual(num_nodes(), 2)
    
  451. 
    
  452.         self.record_applied(recorder, "migrations", "6_auto")
    
  453.         loader.build_graph()
    
  454.         self.assertEqual(num_nodes(), 1)
    
  455. 
    
  456.         self.record_applied(recorder, "migrations", "7_auto")
    
  457.         loader.build_graph()
    
  458.         self.assertEqual(num_nodes(), 0)
    
  459. 
    
  460.     @override_settings(
    
  461.         MIGRATION_MODULES={"migrations": "migrations.test_migrations"},
    
  462.         INSTALLED_APPS=["migrations"],
    
  463.     )
    
  464.     def test_check_consistent_history(self):
    
  465.         loader = MigrationLoader(connection=None)
    
  466.         loader.check_consistent_history(connection)
    
  467.         recorder = MigrationRecorder(connection)
    
  468.         self.record_applied(recorder, "migrations", "0002_second")
    
  469.         msg = (
    
  470.             "Migration migrations.0002_second is applied before its dependency "
    
  471.             "migrations.0001_initial on database 'default'."
    
  472.         )
    
  473.         with self.assertRaisesMessage(InconsistentMigrationHistory, msg):
    
  474.             loader.check_consistent_history(connection)
    
  475. 
    
  476.     @override_settings(
    
  477.         MIGRATION_MODULES={"migrations": "migrations.test_migrations_squashed_extra"},
    
  478.         INSTALLED_APPS=["migrations"],
    
  479.     )
    
  480.     def test_check_consistent_history_squashed(self):
    
  481.         """
    
  482.         MigrationLoader.check_consistent_history() should ignore unapplied
    
  483.         squashed migrations that have all of their `replaces` applied.
    
  484.         """
    
  485.         loader = MigrationLoader(connection=None)
    
  486.         recorder = MigrationRecorder(connection)
    
  487.         self.record_applied(recorder, "migrations", "0001_initial")
    
  488.         self.record_applied(recorder, "migrations", "0002_second")
    
  489.         loader.check_consistent_history(connection)
    
  490.         self.record_applied(recorder, "migrations", "0003_third")
    
  491.         loader.check_consistent_history(connection)
    
  492. 
    
  493.     @override_settings(
    
  494.         MIGRATION_MODULES={
    
  495.             "app1": "migrations.test_migrations_squashed_ref_squashed.app1",
    
  496.             "app2": "migrations.test_migrations_squashed_ref_squashed.app2",
    
  497.         }
    
  498.     )
    
  499.     @modify_settings(
    
  500.         INSTALLED_APPS={
    
  501.             "append": [
    
  502.                 "migrations.test_migrations_squashed_ref_squashed.app1",
    
  503.                 "migrations.test_migrations_squashed_ref_squashed.app2",
    
  504.             ]
    
  505.         }
    
  506.     )
    
  507.     def test_loading_squashed_ref_squashed(self):
    
  508.         "Tests loading a squashed migration with a new migration referencing it"
    
  509.         r"""
    
  510.         The sample migrations are structured like this:
    
  511. 
    
  512.         app_1       1 --> 2 ---------------------*--> 3        *--> 4
    
  513.                      \                          /             /
    
  514.                       *-------------------*----/--> 2_sq_3 --*
    
  515.                        \                 /    /
    
  516.         =============== \ ============= / == / ======================
    
  517.         app_2            *--> 1_sq_2 --*    /
    
  518.                           \                /
    
  519.                            *--> 1 --> 2 --*
    
  520. 
    
  521.         Where 2_sq_3 is a replacing migration for 2 and 3 in app_1,
    
  522.         as 1_sq_2 is a replacing migration for 1 and 2 in app_2.
    
  523.         """
    
  524. 
    
  525.         loader = MigrationLoader(connection)
    
  526.         recorder = MigrationRecorder(connection)
    
  527.         self.addCleanup(recorder.flush)
    
  528. 
    
  529.         # Load with nothing applied: both migrations squashed.
    
  530.         loader.build_graph()
    
  531.         plan = set(loader.graph.forwards_plan(("app1", "4_auto")))
    
  532.         plan = plan - loader.applied_migrations.keys()
    
  533.         expected_plan = {
    
  534.             ("app1", "1_auto"),
    
  535.             ("app2", "1_squashed_2"),
    
  536.             ("app1", "2_squashed_3"),
    
  537.             ("app1", "4_auto"),
    
  538.         }
    
  539.         self.assertEqual(plan, expected_plan)
    
  540. 
    
  541.         # Load with nothing applied and migrate to a replaced migration.
    
  542.         # Not possible if loader.replace_migrations is True (default).
    
  543.         loader.build_graph()
    
  544.         msg = "Node ('app1', '3_auto') not a valid node"
    
  545.         with self.assertRaisesMessage(NodeNotFoundError, msg):
    
  546.             loader.graph.forwards_plan(("app1", "3_auto"))
    
  547.         # Possible if loader.replace_migrations is False.
    
  548.         loader.replace_migrations = False
    
  549.         loader.build_graph()
    
  550.         plan = set(loader.graph.forwards_plan(("app1", "3_auto")))
    
  551.         plan = plan - loader.applied_migrations.keys()
    
  552.         expected_plan = {
    
  553.             ("app1", "1_auto"),
    
  554.             ("app2", "1_auto"),
    
  555.             ("app2", "2_auto"),
    
  556.             ("app1", "2_auto"),
    
  557.             ("app1", "3_auto"),
    
  558.         }
    
  559.         self.assertEqual(plan, expected_plan)
    
  560.         loader.replace_migrations = True
    
  561. 
    
  562.         # Fake-apply a few from app1: unsquashes migration in app1.
    
  563.         self.record_applied(recorder, "app1", "1_auto")
    
  564.         self.record_applied(recorder, "app1", "2_auto")
    
  565.         loader.build_graph()
    
  566.         plan = set(loader.graph.forwards_plan(("app1", "4_auto")))
    
  567.         plan = plan - loader.applied_migrations.keys()
    
  568.         expected_plan = {
    
  569.             ("app2", "1_squashed_2"),
    
  570.             ("app1", "3_auto"),
    
  571.             ("app1", "4_auto"),
    
  572.         }
    
  573.         self.assertEqual(plan, expected_plan)
    
  574. 
    
  575.         # Fake-apply one from app2: unsquashes migration in app2 too.
    
  576.         self.record_applied(recorder, "app2", "1_auto")
    
  577.         loader.build_graph()
    
  578.         plan = set(loader.graph.forwards_plan(("app1", "4_auto")))
    
  579.         plan = plan - loader.applied_migrations.keys()
    
  580.         expected_plan = {
    
  581.             ("app2", "2_auto"),
    
  582.             ("app1", "3_auto"),
    
  583.             ("app1", "4_auto"),
    
  584.         }
    
  585.         self.assertEqual(plan, expected_plan)
    
  586. 
    
  587.     @override_settings(
    
  588.         MIGRATION_MODULES={"migrations": "migrations.test_migrations_private"}
    
  589.     )
    
  590.     def test_ignore_files(self):
    
  591.         """Files prefixed with underscore, tilde, or dot aren't loaded."""
    
  592.         loader = MigrationLoader(connection)
    
  593.         loader.load_disk()
    
  594.         migrations = [
    
  595.             name for app, name in loader.disk_migrations if app == "migrations"
    
  596.         ]
    
  597.         self.assertEqual(migrations, ["0001_initial"])
    
  598. 
    
  599.     @override_settings(
    
  600.         MIGRATION_MODULES={
    
  601.             "migrations": "migrations.test_migrations_namespace_package"
    
  602.         },
    
  603.     )
    
  604.     def test_loading_namespace_package(self):
    
  605.         """Migration directories without an __init__.py file are ignored."""
    
  606.         loader = MigrationLoader(connection)
    
  607.         loader.load_disk()
    
  608.         migrations = [
    
  609.             name for app, name in loader.disk_migrations if app == "migrations"
    
  610.         ]
    
  611.         self.assertEqual(migrations, [])
    
  612. 
    
  613.     @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"})
    
  614.     def test_loading_package_without__file__(self):
    
  615.         """
    
  616.         To support frozen environments, MigrationLoader loads migrations from
    
  617.         regular packages with no __file__ attribute.
    
  618.         """
    
  619.         test_module = import_module("migrations.test_migrations")
    
  620.         loader = MigrationLoader(connection)
    
  621.         # __file__ == __spec__.origin or the latter is None and former is
    
  622.         # undefined.
    
  623.         module_file = test_module.__file__
    
  624.         module_origin = test_module.__spec__.origin
    
  625.         module_has_location = test_module.__spec__.has_location
    
  626.         try:
    
  627.             del test_module.__file__
    
  628.             test_module.__spec__.origin = None
    
  629.             test_module.__spec__.has_location = False
    
  630.             loader.load_disk()
    
  631.             migrations = [
    
  632.                 name for app, name in loader.disk_migrations if app == "migrations"
    
  633.             ]
    
  634.             self.assertCountEqual(migrations, ["0001_initial", "0002_second"])
    
  635.         finally:
    
  636.             test_module.__file__ = module_file
    
  637.             test_module.__spec__.origin = module_origin
    
  638.             test_module.__spec__.has_location = module_has_location
    
  639. 
    
  640. 
    
  641. class PycLoaderTests(MigrationTestBase):
    
  642.     def test_valid(self):
    
  643.         """
    
  644.         To support frozen environments, MigrationLoader loads .pyc migrations.
    
  645.         """
    
  646.         with self.temporary_migration_module(
    
  647.             module="migrations.test_migrations"
    
  648.         ) as migration_dir:
    
  649.             # Compile .py files to .pyc files and delete .py files.
    
  650.             compileall.compile_dir(migration_dir, force=True, quiet=1, legacy=True)
    
  651.             for name in os.listdir(migration_dir):
    
  652.                 if name.endswith(".py"):
    
  653.                     os.remove(os.path.join(migration_dir, name))
    
  654.             loader = MigrationLoader(connection)
    
  655.             self.assertIn(("migrations", "0001_initial"), loader.disk_migrations)
    
  656. 
    
  657.     def test_invalid(self):
    
  658.         """
    
  659.         MigrationLoader reraises ImportErrors caused by "bad magic number" pyc
    
  660.         files with a more helpful message.
    
  661.         """
    
  662.         with self.temporary_migration_module(
    
  663.             module="migrations.test_migrations_bad_pyc"
    
  664.         ) as migration_dir:
    
  665.             # The -tpl suffix is to avoid the pyc exclusion in MANIFEST.in.
    
  666.             os.rename(
    
  667.                 os.path.join(migration_dir, "0001_initial.pyc-tpl"),
    
  668.                 os.path.join(migration_dir, "0001_initial.pyc"),
    
  669.             )
    
  670.             msg = (
    
  671.                 r"Couldn't import '\w+.migrations.0001_initial' as it appears "
    
  672.                 "to be a stale .pyc file."
    
  673.             )
    
  674.             with self.assertRaisesRegex(ImportError, msg):
    
  675.                 MigrationLoader(connection)