1. import contextlib
    
  2. import os
    
  3. import py_compile
    
  4. import shutil
    
  5. import sys
    
  6. import tempfile
    
  7. import threading
    
  8. import time
    
  9. import types
    
  10. import weakref
    
  11. import zipfile
    
  12. from importlib import import_module
    
  13. from pathlib import Path
    
  14. from subprocess import CompletedProcess
    
  15. from unittest import mock, skip, skipIf
    
  16. 
    
  17. try:
    
  18.     import zoneinfo
    
  19. except ImportError:
    
  20.     from backports import zoneinfo
    
  21. 
    
  22. import django.__main__
    
  23. from django.apps.registry import Apps
    
  24. from django.test import SimpleTestCase
    
  25. from django.test.utils import extend_sys_path
    
  26. from django.utils import autoreload
    
  27. from django.utils.autoreload import WatchmanUnavailable
    
  28. 
    
  29. from .test_module import __main__ as test_main
    
  30. from .test_module import main_module as test_main_module
    
  31. from .utils import on_macos_with_hfs
    
  32. 
    
  33. 
    
  34. class TestIterModulesAndFiles(SimpleTestCase):
    
  35.     def import_and_cleanup(self, name):
    
  36.         import_module(name)
    
  37.         self.addCleanup(lambda: sys.path_importer_cache.clear())
    
  38.         self.addCleanup(lambda: sys.modules.pop(name, None))
    
  39. 
    
  40.     def clear_autoreload_caches(self):
    
  41.         autoreload.iter_modules_and_files.cache_clear()
    
  42. 
    
  43.     def assertFileFound(self, filename):
    
  44.         # Some temp directories are symlinks. Python resolves these fully while
    
  45.         # importing.
    
  46.         resolved_filename = filename.resolve(strict=True)
    
  47.         self.clear_autoreload_caches()
    
  48.         # Test uncached access
    
  49.         self.assertIn(
    
  50.             resolved_filename, list(autoreload.iter_all_python_module_files())
    
  51.         )
    
  52.         # Test cached access
    
  53.         self.assertIn(
    
  54.             resolved_filename, list(autoreload.iter_all_python_module_files())
    
  55.         )
    
  56.         self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
    
  57. 
    
  58.     def assertFileNotFound(self, filename):
    
  59.         resolved_filename = filename.resolve(strict=True)
    
  60.         self.clear_autoreload_caches()
    
  61.         # Test uncached access
    
  62.         self.assertNotIn(
    
  63.             resolved_filename, list(autoreload.iter_all_python_module_files())
    
  64.         )
    
  65.         # Test cached access
    
  66.         self.assertNotIn(
    
  67.             resolved_filename, list(autoreload.iter_all_python_module_files())
    
  68.         )
    
  69.         self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
    
  70. 
    
  71.     def temporary_file(self, filename):
    
  72.         dirname = tempfile.mkdtemp()
    
  73.         self.addCleanup(shutil.rmtree, dirname)
    
  74.         return Path(dirname) / filename
    
  75. 
    
  76.     def test_paths_are_pathlib_instances(self):
    
  77.         for filename in autoreload.iter_all_python_module_files():
    
  78.             self.assertIsInstance(filename, Path)
    
  79. 
    
  80.     def test_file_added(self):
    
  81.         """
    
  82.         When a file is added, it's returned by iter_all_python_module_files().
    
  83.         """
    
  84.         filename = self.temporary_file("test_deleted_removed_module.py")
    
  85.         filename.touch()
    
  86. 
    
  87.         with extend_sys_path(str(filename.parent)):
    
  88.             self.import_and_cleanup("test_deleted_removed_module")
    
  89. 
    
  90.         self.assertFileFound(filename.absolute())
    
  91. 
    
  92.     def test_check_errors(self):
    
  93.         """
    
  94.         When a file containing an error is imported in a function wrapped by
    
  95.         check_errors(), gen_filenames() returns it.
    
  96.         """
    
  97.         filename = self.temporary_file("test_syntax_error.py")
    
  98.         filename.write_text("Ceci n'est pas du Python.")
    
  99. 
    
  100.         with extend_sys_path(str(filename.parent)):
    
  101.             try:
    
  102.                 with self.assertRaises(SyntaxError):
    
  103.                     autoreload.check_errors(import_module)("test_syntax_error")
    
  104.             finally:
    
  105.                 autoreload._exception = None
    
  106.         self.assertFileFound(filename)
    
  107. 
    
  108.     def test_check_errors_catches_all_exceptions(self):
    
  109.         """
    
  110.         Since Python may raise arbitrary exceptions when importing code,
    
  111.         check_errors() must catch Exception, not just some subclasses.
    
  112.         """
    
  113.         filename = self.temporary_file("test_exception.py")
    
  114.         filename.write_text("raise Exception")
    
  115.         with extend_sys_path(str(filename.parent)):
    
  116.             try:
    
  117.                 with self.assertRaises(Exception):
    
  118.                     autoreload.check_errors(import_module)("test_exception")
    
  119.             finally:
    
  120.                 autoreload._exception = None
    
  121.         self.assertFileFound(filename)
    
  122. 
    
  123.     def test_zip_reload(self):
    
  124.         """
    
  125.         Modules imported from zipped files have their archive location included
    
  126.         in the result.
    
  127.         """
    
  128.         zip_file = self.temporary_file("zip_import.zip")
    
  129.         with zipfile.ZipFile(str(zip_file), "w", zipfile.ZIP_DEFLATED) as zipf:
    
  130.             zipf.writestr("test_zipped_file.py", "")
    
  131. 
    
  132.         with extend_sys_path(str(zip_file)):
    
  133.             self.import_and_cleanup("test_zipped_file")
    
  134.         self.assertFileFound(zip_file)
    
  135. 
    
  136.     def test_bytecode_conversion_to_source(self):
    
  137.         """.pyc and .pyo files are included in the files list."""
    
  138.         filename = self.temporary_file("test_compiled.py")
    
  139.         filename.touch()
    
  140.         compiled_file = Path(
    
  141.             py_compile.compile(str(filename), str(filename.with_suffix(".pyc")))
    
  142.         )
    
  143.         filename.unlink()
    
  144.         with extend_sys_path(str(compiled_file.parent)):
    
  145.             self.import_and_cleanup("test_compiled")
    
  146.         self.assertFileFound(compiled_file)
    
  147. 
    
  148.     def test_weakref_in_sys_module(self):
    
  149.         """iter_all_python_module_file() ignores weakref modules."""
    
  150.         time_proxy = weakref.proxy(time)
    
  151.         sys.modules["time_proxy"] = time_proxy
    
  152.         self.addCleanup(lambda: sys.modules.pop("time_proxy", None))
    
  153.         list(autoreload.iter_all_python_module_files())  # No crash.
    
  154. 
    
  155.     def test_module_without_spec(self):
    
  156.         module = types.ModuleType("test_module")
    
  157.         del module.__spec__
    
  158.         self.assertEqual(
    
  159.             autoreload.iter_modules_and_files((module,), frozenset()), frozenset()
    
  160.         )
    
  161. 
    
  162.     def test_main_module_is_resolved(self):
    
  163.         main_module = sys.modules["__main__"]
    
  164.         self.assertFileFound(Path(main_module.__file__))
    
  165. 
    
  166.     def test_main_module_without_file_is_not_resolved(self):
    
  167.         fake_main = types.ModuleType("__main__")
    
  168.         self.assertEqual(
    
  169.             autoreload.iter_modules_and_files((fake_main,), frozenset()), frozenset()
    
  170.         )
    
  171. 
    
  172.     def test_path_with_embedded_null_bytes(self):
    
  173.         for path in (
    
  174.             "embedded_null_byte\x00.py",
    
  175.             "di\x00rectory/embedded_null_byte.py",
    
  176.         ):
    
  177.             with self.subTest(path=path):
    
  178.                 self.assertEqual(
    
  179.                     autoreload.iter_modules_and_files((), frozenset([path])),
    
  180.                     frozenset(),
    
  181.                 )
    
  182. 
    
  183. 
    
  184. class TestChildArguments(SimpleTestCase):
    
  185.     @mock.patch.dict(sys.modules, {"__main__": django.__main__})
    
  186.     @mock.patch("sys.argv", [django.__main__.__file__, "runserver"])
    
  187.     @mock.patch("sys.warnoptions", [])
    
  188.     @mock.patch("sys._xoptions", {})
    
  189.     def test_run_as_module(self):
    
  190.         self.assertEqual(
    
  191.             autoreload.get_child_arguments(),
    
  192.             [sys.executable, "-m", "django", "runserver"],
    
  193.         )
    
  194. 
    
  195.     @mock.patch.dict(sys.modules, {"__main__": test_main})
    
  196.     @mock.patch("sys.argv", [test_main.__file__, "runserver"])
    
  197.     @mock.patch("sys.warnoptions", [])
    
  198.     @mock.patch("sys._xoptions", {})
    
  199.     def test_run_as_non_django_module(self):
    
  200.         self.assertEqual(
    
  201.             autoreload.get_child_arguments(),
    
  202.             [sys.executable, "-m", "utils_tests.test_module", "runserver"],
    
  203.         )
    
  204. 
    
  205.     @mock.patch.dict(sys.modules, {"__main__": test_main_module})
    
  206.     @mock.patch("sys.argv", [test_main.__file__, "runserver"])
    
  207.     @mock.patch("sys.warnoptions", [])
    
  208.     @mock.patch("sys._xoptions", {})
    
  209.     def test_run_as_non_django_module_non_package(self):
    
  210.         self.assertEqual(
    
  211.             autoreload.get_child_arguments(),
    
  212.             [sys.executable, "-m", "utils_tests.test_module.main_module", "runserver"],
    
  213.         )
    
  214. 
    
  215.     @mock.patch("__main__.__spec__", None)
    
  216.     @mock.patch("sys.argv", [__file__, "runserver"])
    
  217.     @mock.patch("sys.warnoptions", ["error"])
    
  218.     @mock.patch("sys._xoptions", {})
    
  219.     def test_warnoptions(self):
    
  220.         self.assertEqual(
    
  221.             autoreload.get_child_arguments(),
    
  222.             [sys.executable, "-Werror", __file__, "runserver"],
    
  223.         )
    
  224. 
    
  225.     @mock.patch("sys.argv", [__file__, "runserver"])
    
  226.     @mock.patch("sys.warnoptions", [])
    
  227.     @mock.patch("sys._xoptions", {"utf8": True, "a": "b"})
    
  228.     def test_xoptions(self):
    
  229.         self.assertEqual(
    
  230.             autoreload.get_child_arguments(),
    
  231.             [sys.executable, "-Xutf8", "-Xa=b", __file__, "runserver"],
    
  232.         )
    
  233. 
    
  234.     @mock.patch("__main__.__spec__", None)
    
  235.     @mock.patch("sys.warnoptions", [])
    
  236.     def test_exe_fallback(self):
    
  237.         with tempfile.TemporaryDirectory() as tmpdir:
    
  238.             exe_path = Path(tmpdir) / "django-admin.exe"
    
  239.             exe_path.touch()
    
  240.             with mock.patch("sys.argv", [exe_path.with_suffix(""), "runserver"]):
    
  241.                 self.assertEqual(
    
  242.                     autoreload.get_child_arguments(), [exe_path, "runserver"]
    
  243.                 )
    
  244. 
    
  245.     @mock.patch("__main__.__spec__", None)
    
  246.     @mock.patch("sys.warnoptions", [])
    
  247.     @mock.patch("sys._xoptions", {})
    
  248.     def test_entrypoint_fallback(self):
    
  249.         with tempfile.TemporaryDirectory() as tmpdir:
    
  250.             script_path = Path(tmpdir) / "django-admin-script.py"
    
  251.             script_path.touch()
    
  252.             with mock.patch(
    
  253.                 "sys.argv", [script_path.with_name("django-admin"), "runserver"]
    
  254.             ):
    
  255.                 self.assertEqual(
    
  256.                     autoreload.get_child_arguments(),
    
  257.                     [sys.executable, script_path, "runserver"],
    
  258.                 )
    
  259. 
    
  260.     @mock.patch("__main__.__spec__", None)
    
  261.     @mock.patch("sys.argv", ["does-not-exist", "runserver"])
    
  262.     @mock.patch("sys.warnoptions", [])
    
  263.     def test_raises_runtimeerror(self):
    
  264.         msg = "Script does-not-exist does not exist."
    
  265.         with self.assertRaisesMessage(RuntimeError, msg):
    
  266.             autoreload.get_child_arguments()
    
  267. 
    
  268.     @mock.patch("sys.argv", [__file__, "runserver"])
    
  269.     @mock.patch("sys.warnoptions", [])
    
  270.     @mock.patch("sys._xoptions", {})
    
  271.     def test_module_no_spec(self):
    
  272.         module = types.ModuleType("test_module")
    
  273.         del module.__spec__
    
  274.         with mock.patch.dict(sys.modules, {"__main__": module}):
    
  275.             self.assertEqual(
    
  276.                 autoreload.get_child_arguments(),
    
  277.                 [sys.executable, __file__, "runserver"],
    
  278.             )
    
  279. 
    
  280. 
    
  281. class TestUtilities(SimpleTestCase):
    
  282.     def test_is_django_module(self):
    
  283.         for module, expected in ((zoneinfo, False), (sys, False), (autoreload, True)):
    
  284.             with self.subTest(module=module):
    
  285.                 self.assertIs(autoreload.is_django_module(module), expected)
    
  286. 
    
  287.     def test_is_django_path(self):
    
  288.         for module, expected in (
    
  289.             (zoneinfo.__file__, False),
    
  290.             (contextlib.__file__, False),
    
  291.             (autoreload.__file__, True),
    
  292.         ):
    
  293.             with self.subTest(module=module):
    
  294.                 self.assertIs(autoreload.is_django_path(module), expected)
    
  295. 
    
  296. 
    
  297. class TestCommonRoots(SimpleTestCase):
    
  298.     def test_common_roots(self):
    
  299.         paths = (
    
  300.             Path("/first/second"),
    
  301.             Path("/first/second/third"),
    
  302.             Path("/first/"),
    
  303.             Path("/root/first/"),
    
  304.         )
    
  305.         results = autoreload.common_roots(paths)
    
  306.         self.assertCountEqual(results, [Path("/first/"), Path("/root/first/")])
    
  307. 
    
  308. 
    
  309. class TestSysPathDirectories(SimpleTestCase):
    
  310.     def setUp(self):
    
  311.         self._directory = tempfile.TemporaryDirectory()
    
  312.         self.directory = Path(self._directory.name).resolve(strict=True).absolute()
    
  313.         self.file = self.directory / "test"
    
  314.         self.file.touch()
    
  315. 
    
  316.     def tearDown(self):
    
  317.         self._directory.cleanup()
    
  318. 
    
  319.     def test_sys_paths_with_directories(self):
    
  320.         with extend_sys_path(str(self.file)):
    
  321.             paths = list(autoreload.sys_path_directories())
    
  322.         self.assertIn(self.file.parent, paths)
    
  323. 
    
  324.     def test_sys_paths_non_existing(self):
    
  325.         nonexistent_file = Path(self.directory.name) / "does_not_exist"
    
  326.         with extend_sys_path(str(nonexistent_file)):
    
  327.             paths = list(autoreload.sys_path_directories())
    
  328.         self.assertNotIn(nonexistent_file, paths)
    
  329.         self.assertNotIn(nonexistent_file.parent, paths)
    
  330. 
    
  331.     def test_sys_paths_absolute(self):
    
  332.         paths = list(autoreload.sys_path_directories())
    
  333.         self.assertTrue(all(p.is_absolute() for p in paths))
    
  334. 
    
  335.     def test_sys_paths_directories(self):
    
  336.         with extend_sys_path(str(self.directory)):
    
  337.             paths = list(autoreload.sys_path_directories())
    
  338.         self.assertIn(self.directory, paths)
    
  339. 
    
  340. 
    
  341. class GetReloaderTests(SimpleTestCase):
    
  342.     @mock.patch("django.utils.autoreload.WatchmanReloader")
    
  343.     def test_watchman_unavailable(self, mocked_watchman):
    
  344.         mocked_watchman.check_availability.side_effect = WatchmanUnavailable
    
  345.         self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader)
    
  346. 
    
  347.     @mock.patch.object(autoreload.WatchmanReloader, "check_availability")
    
  348.     def test_watchman_available(self, mocked_available):
    
  349.         # If WatchmanUnavailable isn't raised, Watchman will be chosen.
    
  350.         mocked_available.return_value = None
    
  351.         result = autoreload.get_reloader()
    
  352.         self.assertIsInstance(result, autoreload.WatchmanReloader)
    
  353. 
    
  354. 
    
  355. class RunWithReloaderTests(SimpleTestCase):
    
  356.     @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: "true"})
    
  357.     @mock.patch("django.utils.autoreload.get_reloader")
    
  358.     def test_swallows_keyboard_interrupt(self, mocked_get_reloader):
    
  359.         mocked_get_reloader.side_effect = KeyboardInterrupt()
    
  360.         autoreload.run_with_reloader(lambda: None)  # No exception
    
  361. 
    
  362.     @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: "false"})
    
  363.     @mock.patch("django.utils.autoreload.restart_with_reloader")
    
  364.     def test_calls_sys_exit(self, mocked_restart_reloader):
    
  365.         mocked_restart_reloader.return_value = 1
    
  366.         with self.assertRaises(SystemExit) as exc:
    
  367.             autoreload.run_with_reloader(lambda: None)
    
  368.         self.assertEqual(exc.exception.code, 1)
    
  369. 
    
  370.     @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: "true"})
    
  371.     @mock.patch("django.utils.autoreload.start_django")
    
  372.     @mock.patch("django.utils.autoreload.get_reloader")
    
  373.     def test_calls_start_django(self, mocked_reloader, mocked_start_django):
    
  374.         mocked_reloader.return_value = mock.sentinel.RELOADER
    
  375.         autoreload.run_with_reloader(mock.sentinel.METHOD)
    
  376.         self.assertEqual(mocked_start_django.call_count, 1)
    
  377.         self.assertSequenceEqual(
    
  378.             mocked_start_django.call_args[0],
    
  379.             [mock.sentinel.RELOADER, mock.sentinel.METHOD],
    
  380.         )
    
  381. 
    
  382. 
    
  383. class StartDjangoTests(SimpleTestCase):
    
  384.     @mock.patch("django.utils.autoreload.StatReloader")
    
  385.     def test_watchman_becomes_unavailable(self, mocked_stat):
    
  386.         mocked_stat.should_stop.return_value = True
    
  387.         fake_reloader = mock.MagicMock()
    
  388.         fake_reloader.should_stop = False
    
  389.         fake_reloader.run.side_effect = autoreload.WatchmanUnavailable()
    
  390. 
    
  391.         autoreload.start_django(fake_reloader, lambda: None)
    
  392.         self.assertEqual(mocked_stat.call_count, 1)
    
  393. 
    
  394.     @mock.patch("django.utils.autoreload.ensure_echo_on")
    
  395.     def test_echo_on_called(self, mocked_echo):
    
  396.         fake_reloader = mock.MagicMock()
    
  397.         autoreload.start_django(fake_reloader, lambda: None)
    
  398.         self.assertEqual(mocked_echo.call_count, 1)
    
  399. 
    
  400.     @mock.patch("django.utils.autoreload.check_errors")
    
  401.     def test_check_errors_called(self, mocked_check_errors):
    
  402.         fake_method = mock.MagicMock(return_value=None)
    
  403.         fake_reloader = mock.MagicMock()
    
  404.         autoreload.start_django(fake_reloader, fake_method)
    
  405.         self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method])
    
  406. 
    
  407.     @mock.patch("threading.Thread")
    
  408.     @mock.patch("django.utils.autoreload.check_errors")
    
  409.     def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread):
    
  410.         fake_reloader = mock.MagicMock()
    
  411.         fake_main_func = mock.MagicMock()
    
  412.         fake_thread = mock.MagicMock()
    
  413.         mocked_check_errors.return_value = fake_main_func
    
  414.         mocked_thread.return_value = fake_thread
    
  415.         autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123)
    
  416.         self.assertEqual(mocked_thread.call_count, 1)
    
  417.         self.assertEqual(
    
  418.             mocked_thread.call_args[1],
    
  419.             {
    
  420.                 "target": fake_main_func,
    
  421.                 "args": (123,),
    
  422.                 "kwargs": {"abc": 123},
    
  423.                 "name": "django-main-thread",
    
  424.             },
    
  425.         )
    
  426.         self.assertIs(fake_thread.daemon, True)
    
  427.         self.assertTrue(fake_thread.start.called)
    
  428. 
    
  429. 
    
  430. class TestCheckErrors(SimpleTestCase):
    
  431.     def test_mutates_error_files(self):
    
  432.         fake_method = mock.MagicMock(side_effect=RuntimeError())
    
  433.         wrapped = autoreload.check_errors(fake_method)
    
  434.         with mock.patch.object(autoreload, "_error_files") as mocked_error_files:
    
  435.             try:
    
  436.                 with self.assertRaises(RuntimeError):
    
  437.                     wrapped()
    
  438.             finally:
    
  439.                 autoreload._exception = None
    
  440.         self.assertEqual(mocked_error_files.append.call_count, 1)
    
  441. 
    
  442. 
    
  443. class TestRaiseLastException(SimpleTestCase):
    
  444.     @mock.patch("django.utils.autoreload._exception", None)
    
  445.     def test_no_exception(self):
    
  446.         # Should raise no exception if _exception is None
    
  447.         autoreload.raise_last_exception()
    
  448. 
    
  449.     def test_raises_exception(self):
    
  450.         class MyException(Exception):
    
  451.             pass
    
  452. 
    
  453.         # Create an exception
    
  454.         try:
    
  455.             raise MyException("Test Message")
    
  456.         except MyException:
    
  457.             exc_info = sys.exc_info()
    
  458. 
    
  459.         with mock.patch("django.utils.autoreload._exception", exc_info):
    
  460.             with self.assertRaisesMessage(MyException, "Test Message"):
    
  461.                 autoreload.raise_last_exception()
    
  462. 
    
  463.     def test_raises_custom_exception(self):
    
  464.         class MyException(Exception):
    
  465.             def __init__(self, msg, extra_context):
    
  466.                 super().__init__(msg)
    
  467.                 self.extra_context = extra_context
    
  468. 
    
  469.         # Create an exception.
    
  470.         try:
    
  471.             raise MyException("Test Message", "extra context")
    
  472.         except MyException:
    
  473.             exc_info = sys.exc_info()
    
  474. 
    
  475.         with mock.patch("django.utils.autoreload._exception", exc_info):
    
  476.             with self.assertRaisesMessage(MyException, "Test Message"):
    
  477.                 autoreload.raise_last_exception()
    
  478. 
    
  479.     def test_raises_exception_with_context(self):
    
  480.         try:
    
  481.             raise Exception(2)
    
  482.         except Exception as e:
    
  483.             try:
    
  484.                 raise Exception(1) from e
    
  485.             except Exception:
    
  486.                 exc_info = sys.exc_info()
    
  487. 
    
  488.         with mock.patch("django.utils.autoreload._exception", exc_info):
    
  489.             with self.assertRaises(Exception) as cm:
    
  490.                 autoreload.raise_last_exception()
    
  491.             self.assertEqual(cm.exception.args[0], 1)
    
  492.             self.assertEqual(cm.exception.__cause__.args[0], 2)
    
  493. 
    
  494. 
    
  495. class RestartWithReloaderTests(SimpleTestCase):
    
  496.     executable = "/usr/bin/python"
    
  497. 
    
  498.     def patch_autoreload(self, argv):
    
  499.         patch_call = mock.patch(
    
  500.             "django.utils.autoreload.subprocess.run",
    
  501.             return_value=CompletedProcess(argv, 0),
    
  502.         )
    
  503.         patches = [
    
  504.             mock.patch("django.utils.autoreload.sys.argv", argv),
    
  505.             mock.patch("django.utils.autoreload.sys.executable", self.executable),
    
  506.             mock.patch("django.utils.autoreload.sys.warnoptions", ["all"]),
    
  507.             mock.patch("django.utils.autoreload.sys._xoptions", {}),
    
  508.         ]
    
  509.         for p in patches:
    
  510.             p.start()
    
  511.             self.addCleanup(p.stop)
    
  512.         mock_call = patch_call.start()
    
  513.         self.addCleanup(patch_call.stop)
    
  514.         return mock_call
    
  515. 
    
  516.     def test_manage_py(self):
    
  517.         with tempfile.TemporaryDirectory() as temp_dir:
    
  518.             script = Path(temp_dir) / "manage.py"
    
  519.             script.touch()
    
  520.             argv = [str(script), "runserver"]
    
  521.             mock_call = self.patch_autoreload(argv)
    
  522.             with mock.patch("__main__.__spec__", None):
    
  523.                 autoreload.restart_with_reloader()
    
  524.             self.assertEqual(mock_call.call_count, 1)
    
  525.             self.assertEqual(
    
  526.                 mock_call.call_args[0][0],
    
  527.                 [self.executable, "-Wall"] + argv,
    
  528.             )
    
  529. 
    
  530.     def test_python_m_django(self):
    
  531.         main = "/usr/lib/pythonX.Y/site-packages/django/__main__.py"
    
  532.         argv = [main, "runserver"]
    
  533.         mock_call = self.patch_autoreload(argv)
    
  534.         with mock.patch("django.__main__.__file__", main):
    
  535.             with mock.patch.dict(sys.modules, {"__main__": django.__main__}):
    
  536.                 autoreload.restart_with_reloader()
    
  537.             self.assertEqual(mock_call.call_count, 1)
    
  538.             self.assertEqual(
    
  539.                 mock_call.call_args[0][0],
    
  540.                 [self.executable, "-Wall", "-m", "django"] + argv[1:],
    
  541.             )
    
  542. 
    
  543. 
    
  544. class ReloaderTests(SimpleTestCase):
    
  545.     RELOADER_CLS = None
    
  546. 
    
  547.     def setUp(self):
    
  548.         self._tempdir = tempfile.TemporaryDirectory()
    
  549.         self.tempdir = Path(self._tempdir.name).resolve(strict=True).absolute()
    
  550.         self.existing_file = self.ensure_file(self.tempdir / "test.py")
    
  551.         self.nonexistent_file = (self.tempdir / "does_not_exist.py").absolute()
    
  552.         self.reloader = self.RELOADER_CLS()
    
  553. 
    
  554.     def tearDown(self):
    
  555.         self._tempdir.cleanup()
    
  556.         self.reloader.stop()
    
  557. 
    
  558.     def ensure_file(self, path):
    
  559.         path.parent.mkdir(exist_ok=True, parents=True)
    
  560.         path.touch()
    
  561.         # On Linux and Windows updating the mtime of a file using touch() will
    
  562.         # set a timestamp value that is in the past, as the time value for the
    
  563.         # last kernel tick is used rather than getting the correct absolute
    
  564.         # time.
    
  565.         # To make testing simpler set the mtime to be the observed time when
    
  566.         # this function is called.
    
  567.         self.set_mtime(path, time.time())
    
  568.         return path.absolute()
    
  569. 
    
  570.     def set_mtime(self, fp, value):
    
  571.         os.utime(str(fp), (value, value))
    
  572. 
    
  573.     def increment_mtime(self, fp, by=1):
    
  574.         current_time = time.time()
    
  575.         self.set_mtime(fp, current_time + by)
    
  576. 
    
  577.     @contextlib.contextmanager
    
  578.     def tick_twice(self):
    
  579.         ticker = self.reloader.tick()
    
  580.         next(ticker)
    
  581.         yield
    
  582.         next(ticker)
    
  583. 
    
  584. 
    
  585. class IntegrationTests:
    
  586.     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
    
  587.     @mock.patch(
    
  588.         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
    
  589.     )
    
  590.     def test_glob(self, mocked_modules, notify_mock):
    
  591.         non_py_file = self.ensure_file(self.tempdir / "non_py_file")
    
  592.         self.reloader.watch_dir(self.tempdir, "*.py")
    
  593.         with self.tick_twice():
    
  594.             self.increment_mtime(non_py_file)
    
  595.             self.increment_mtime(self.existing_file)
    
  596.         self.assertEqual(notify_mock.call_count, 1)
    
  597.         self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
    
  598. 
    
  599.     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
    
  600.     @mock.patch(
    
  601.         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
    
  602.     )
    
  603.     def test_multiple_globs(self, mocked_modules, notify_mock):
    
  604.         self.ensure_file(self.tempdir / "x.test")
    
  605.         self.reloader.watch_dir(self.tempdir, "*.py")
    
  606.         self.reloader.watch_dir(self.tempdir, "*.test")
    
  607.         with self.tick_twice():
    
  608.             self.increment_mtime(self.existing_file)
    
  609.         self.assertEqual(notify_mock.call_count, 1)
    
  610.         self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
    
  611. 
    
  612.     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
    
  613.     @mock.patch(
    
  614.         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
    
  615.     )
    
  616.     def test_overlapping_globs(self, mocked_modules, notify_mock):
    
  617.         self.reloader.watch_dir(self.tempdir, "*.py")
    
  618.         self.reloader.watch_dir(self.tempdir, "*.p*")
    
  619.         with self.tick_twice():
    
  620.             self.increment_mtime(self.existing_file)
    
  621.         self.assertEqual(notify_mock.call_count, 1)
    
  622.         self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
    
  623. 
    
  624.     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
    
  625.     @mock.patch(
    
  626.         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
    
  627.     )
    
  628.     def test_glob_recursive(self, mocked_modules, notify_mock):
    
  629.         non_py_file = self.ensure_file(self.tempdir / "dir" / "non_py_file")
    
  630.         py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
    
  631.         self.reloader.watch_dir(self.tempdir, "**/*.py")
    
  632.         with self.tick_twice():
    
  633.             self.increment_mtime(non_py_file)
    
  634.             self.increment_mtime(py_file)
    
  635.         self.assertEqual(notify_mock.call_count, 1)
    
  636.         self.assertCountEqual(notify_mock.call_args[0], [py_file])
    
  637. 
    
  638.     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
    
  639.     @mock.patch(
    
  640.         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
    
  641.     )
    
  642.     def test_multiple_recursive_globs(self, mocked_modules, notify_mock):
    
  643.         non_py_file = self.ensure_file(self.tempdir / "dir" / "test.txt")
    
  644.         py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
    
  645.         self.reloader.watch_dir(self.tempdir, "**/*.txt")
    
  646.         self.reloader.watch_dir(self.tempdir, "**/*.py")
    
  647.         with self.tick_twice():
    
  648.             self.increment_mtime(non_py_file)
    
  649.             self.increment_mtime(py_file)
    
  650.         self.assertEqual(notify_mock.call_count, 2)
    
  651.         self.assertCountEqual(
    
  652.             notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)]
    
  653.         )
    
  654. 
    
  655.     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
    
  656.     @mock.patch(
    
  657.         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
    
  658.     )
    
  659.     def test_nested_glob_recursive(self, mocked_modules, notify_mock):
    
  660.         inner_py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
    
  661.         self.reloader.watch_dir(self.tempdir, "**/*.py")
    
  662.         self.reloader.watch_dir(inner_py_file.parent, "**/*.py")
    
  663.         with self.tick_twice():
    
  664.             self.increment_mtime(inner_py_file)
    
  665.         self.assertEqual(notify_mock.call_count, 1)
    
  666.         self.assertCountEqual(notify_mock.call_args[0], [inner_py_file])
    
  667. 
    
  668.     @mock.patch("django.utils.autoreload.BaseReloader.notify_file_changed")
    
  669.     @mock.patch(
    
  670.         "django.utils.autoreload.iter_all_python_module_files", return_value=frozenset()
    
  671.     )
    
  672.     def test_overlapping_glob_recursive(self, mocked_modules, notify_mock):
    
  673.         py_file = self.ensure_file(self.tempdir / "dir" / "file.py")
    
  674.         self.reloader.watch_dir(self.tempdir, "**/*.p*")
    
  675.         self.reloader.watch_dir(self.tempdir, "**/*.py*")
    
  676.         with self.tick_twice():
    
  677.             self.increment_mtime(py_file)
    
  678.         self.assertEqual(notify_mock.call_count, 1)
    
  679.         self.assertCountEqual(notify_mock.call_args[0], [py_file])
    
  680. 
    
  681. 
    
  682. class BaseReloaderTests(ReloaderTests):
    
  683.     RELOADER_CLS = autoreload.BaseReloader
    
  684. 
    
  685.     def test_watch_dir_with_unresolvable_path(self):
    
  686.         path = Path("unresolvable_directory")
    
  687.         with mock.patch.object(Path, "absolute", side_effect=FileNotFoundError):
    
  688.             self.reloader.watch_dir(path, "**/*.mo")
    
  689.         self.assertEqual(list(self.reloader.directory_globs), [])
    
  690. 
    
  691.     def test_watch_with_glob(self):
    
  692.         self.reloader.watch_dir(self.tempdir, "*.py")
    
  693.         watched_files = list(self.reloader.watched_files())
    
  694.         self.assertIn(self.existing_file, watched_files)
    
  695. 
    
  696.     def test_watch_files_with_recursive_glob(self):
    
  697.         inner_file = self.ensure_file(self.tempdir / "test" / "test.py")
    
  698.         self.reloader.watch_dir(self.tempdir, "**/*.py")
    
  699.         watched_files = list(self.reloader.watched_files())
    
  700.         self.assertIn(self.existing_file, watched_files)
    
  701.         self.assertIn(inner_file, watched_files)
    
  702. 
    
  703.     def test_run_loop_catches_stopiteration(self):
    
  704.         def mocked_tick():
    
  705.             yield
    
  706. 
    
  707.         with mock.patch.object(self.reloader, "tick", side_effect=mocked_tick) as tick:
    
  708.             self.reloader.run_loop()
    
  709.         self.assertEqual(tick.call_count, 1)
    
  710. 
    
  711.     def test_run_loop_stop_and_return(self):
    
  712.         def mocked_tick(*args):
    
  713.             yield
    
  714.             self.reloader.stop()
    
  715.             return  # Raises StopIteration
    
  716. 
    
  717.         with mock.patch.object(self.reloader, "tick", side_effect=mocked_tick) as tick:
    
  718.             self.reloader.run_loop()
    
  719. 
    
  720.         self.assertEqual(tick.call_count, 1)
    
  721. 
    
  722.     def test_wait_for_apps_ready_checks_for_exception(self):
    
  723.         app_reg = Apps()
    
  724.         app_reg.ready_event.set()
    
  725.         # thread.is_alive() is False if it's not started.
    
  726.         dead_thread = threading.Thread()
    
  727.         self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread))
    
  728. 
    
  729.     def test_wait_for_apps_ready_without_exception(self):
    
  730.         app_reg = Apps()
    
  731.         app_reg.ready_event.set()
    
  732.         thread = mock.MagicMock()
    
  733.         thread.is_alive.return_value = True
    
  734.         self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread))
    
  735. 
    
  736. 
    
  737. def skip_unless_watchman_available():
    
  738.     try:
    
  739.         autoreload.WatchmanReloader.check_availability()
    
  740.     except WatchmanUnavailable as e:
    
  741.         return skip("Watchman unavailable: %s" % e)
    
  742.     return lambda func: func
    
  743. 
    
  744. 
    
  745. @skip_unless_watchman_available()
    
  746. class WatchmanReloaderTests(ReloaderTests, IntegrationTests):
    
  747.     RELOADER_CLS = autoreload.WatchmanReloader
    
  748. 
    
  749.     def setUp(self):
    
  750.         super().setUp()
    
  751.         # Shorten the timeout to speed up tests.
    
  752.         self.reloader.client_timeout = int(os.environ.get("DJANGO_WATCHMAN_TIMEOUT", 2))
    
  753. 
    
  754.     def test_watch_glob_ignores_non_existing_directories_two_levels(self):
    
  755.         with mock.patch.object(self.reloader, "_subscribe") as mocked_subscribe:
    
  756.             self.reloader._watch_glob(self.tempdir / "does_not_exist" / "more", ["*"])
    
  757.         self.assertFalse(mocked_subscribe.called)
    
  758. 
    
  759.     def test_watch_glob_uses_existing_parent_directories(self):
    
  760.         with mock.patch.object(self.reloader, "_subscribe") as mocked_subscribe:
    
  761.             self.reloader._watch_glob(self.tempdir / "does_not_exist", ["*"])
    
  762.         self.assertSequenceEqual(
    
  763.             mocked_subscribe.call_args[0],
    
  764.             [
    
  765.                 self.tempdir,
    
  766.                 "glob-parent-does_not_exist:%s" % self.tempdir,
    
  767.                 ["anyof", ["match", "does_not_exist/*", "wholename"]],
    
  768.             ],
    
  769.         )
    
  770. 
    
  771.     def test_watch_glob_multiple_patterns(self):
    
  772.         with mock.patch.object(self.reloader, "_subscribe") as mocked_subscribe:
    
  773.             self.reloader._watch_glob(self.tempdir, ["*", "*.py"])
    
  774.         self.assertSequenceEqual(
    
  775.             mocked_subscribe.call_args[0],
    
  776.             [
    
  777.                 self.tempdir,
    
  778.                 "glob:%s" % self.tempdir,
    
  779.                 ["anyof", ["match", "*", "wholename"], ["match", "*.py", "wholename"]],
    
  780.             ],
    
  781.         )
    
  782. 
    
  783.     def test_watched_roots_contains_files(self):
    
  784.         paths = self.reloader.watched_roots([self.existing_file])
    
  785.         self.assertIn(self.existing_file.parent, paths)
    
  786. 
    
  787.     def test_watched_roots_contains_directory_globs(self):
    
  788.         self.reloader.watch_dir(self.tempdir, "*.py")
    
  789.         paths = self.reloader.watched_roots([])
    
  790.         self.assertIn(self.tempdir, paths)
    
  791. 
    
  792.     def test_watched_roots_contains_sys_path(self):
    
  793.         with extend_sys_path(str(self.tempdir)):
    
  794.             paths = self.reloader.watched_roots([])
    
  795.         self.assertIn(self.tempdir, paths)
    
  796. 
    
  797.     def test_check_server_status(self):
    
  798.         self.assertTrue(self.reloader.check_server_status())
    
  799. 
    
  800.     def test_check_server_status_raises_error(self):
    
  801.         with mock.patch.object(self.reloader.client, "query") as mocked_query:
    
  802.             mocked_query.side_effect = Exception()
    
  803.             with self.assertRaises(autoreload.WatchmanUnavailable):
    
  804.                 self.reloader.check_server_status()
    
  805. 
    
  806.     @mock.patch("pywatchman.client")
    
  807.     def test_check_availability(self, mocked_client):
    
  808.         mocked_client().capabilityCheck.side_effect = Exception()
    
  809.         with self.assertRaisesMessage(
    
  810.             WatchmanUnavailable, "Cannot connect to the watchman service"
    
  811.         ):
    
  812.             self.RELOADER_CLS.check_availability()
    
  813. 
    
  814.     @mock.patch("pywatchman.client")
    
  815.     def test_check_availability_lower_version(self, mocked_client):
    
  816.         mocked_client().capabilityCheck.return_value = {"version": "4.8.10"}
    
  817.         with self.assertRaisesMessage(
    
  818.             WatchmanUnavailable, "Watchman 4.9 or later is required."
    
  819.         ):
    
  820.             self.RELOADER_CLS.check_availability()
    
  821. 
    
  822.     def test_pywatchman_not_available(self):
    
  823.         with mock.patch.object(autoreload, "pywatchman") as mocked:
    
  824.             mocked.__bool__.return_value = False
    
  825.             with self.assertRaisesMessage(
    
  826.                 WatchmanUnavailable, "pywatchman not installed."
    
  827.             ):
    
  828.                 self.RELOADER_CLS.check_availability()
    
  829. 
    
  830.     def test_update_watches_raises_exceptions(self):
    
  831.         class TestException(Exception):
    
  832.             pass
    
  833. 
    
  834.         with mock.patch.object(self.reloader, "_update_watches") as mocked_watches:
    
  835.             with mock.patch.object(
    
  836.                 self.reloader, "check_server_status"
    
  837.             ) as mocked_server_status:
    
  838.                 mocked_watches.side_effect = TestException()
    
  839.                 mocked_server_status.return_value = True
    
  840.                 with self.assertRaises(TestException):
    
  841.                     self.reloader.update_watches()
    
  842.                 self.assertIsInstance(
    
  843.                     mocked_server_status.call_args[0][0], TestException
    
  844.                 )
    
  845. 
    
  846.     @mock.patch.dict(os.environ, {"DJANGO_WATCHMAN_TIMEOUT": "10"})
    
  847.     def test_setting_timeout_from_environment_variable(self):
    
  848.         self.assertEqual(self.RELOADER_CLS().client_timeout, 10)
    
  849. 
    
  850. 
    
  851. @skipIf(on_macos_with_hfs(), "These tests do not work with HFS+ as a filesystem")
    
  852. class StatReloaderTests(ReloaderTests, IntegrationTests):
    
  853.     RELOADER_CLS = autoreload.StatReloader
    
  854. 
    
  855.     def setUp(self):
    
  856.         super().setUp()
    
  857.         # Shorten the sleep time to speed up tests.
    
  858.         self.reloader.SLEEP_TIME = 0.01
    
  859. 
    
  860.     @mock.patch("django.utils.autoreload.StatReloader.notify_file_changed")
    
  861.     def test_tick_does_not_trigger_twice(self, mock_notify_file_changed):
    
  862.         with mock.patch.object(
    
  863.             self.reloader, "watched_files", return_value=[self.existing_file]
    
  864.         ):
    
  865.             ticker = self.reloader.tick()
    
  866.             next(ticker)
    
  867.             self.increment_mtime(self.existing_file)
    
  868.             next(ticker)
    
  869.             next(ticker)
    
  870.             self.assertEqual(mock_notify_file_changed.call_count, 1)
    
  871. 
    
  872.     def test_snapshot_files_ignores_missing_files(self):
    
  873.         with mock.patch.object(
    
  874.             self.reloader, "watched_files", return_value=[self.nonexistent_file]
    
  875.         ):
    
  876.             self.assertEqual(dict(self.reloader.snapshot_files()), {})
    
  877. 
    
  878.     def test_snapshot_files_updates(self):
    
  879.         with mock.patch.object(
    
  880.             self.reloader, "watched_files", return_value=[self.existing_file]
    
  881.         ):
    
  882.             snapshot1 = dict(self.reloader.snapshot_files())
    
  883.             self.assertIn(self.existing_file, snapshot1)
    
  884.             self.increment_mtime(self.existing_file)
    
  885.             snapshot2 = dict(self.reloader.snapshot_files())
    
  886.             self.assertNotEqual(
    
  887.                 snapshot1[self.existing_file], snapshot2[self.existing_file]
    
  888.             )
    
  889. 
    
  890.     def test_snapshot_files_with_duplicates(self):
    
  891.         with mock.patch.object(
    
  892.             self.reloader,
    
  893.             "watched_files",
    
  894.             return_value=[self.existing_file, self.existing_file],
    
  895.         ):
    
  896.             snapshot = list(self.reloader.snapshot_files())
    
  897.             self.assertEqual(len(snapshot), 1)
    
  898.             self.assertEqual(snapshot[0][0], self.existing_file)