1. import os
    
  2. from argparse import ArgumentDefaultsHelpFormatter
    
  3. from io import StringIO
    
  4. from unittest import mock
    
  5. 
    
  6. from admin_scripts.tests import AdminScriptTestCase
    
  7. 
    
  8. from django.apps import apps
    
  9. from django.core import management
    
  10. from django.core.checks import Tags
    
  11. from django.core.management import BaseCommand, CommandError, find_commands
    
  12. from django.core.management.utils import (
    
  13.     find_command,
    
  14.     get_random_secret_key,
    
  15.     is_ignored_path,
    
  16.     normalize_path_patterns,
    
  17.     popen_wrapper,
    
  18. )
    
  19. from django.db import connection
    
  20. from django.test import SimpleTestCase, override_settings
    
  21. from django.test.utils import captured_stderr, extend_sys_path
    
  22. from django.utils import translation
    
  23. 
    
  24. from .management.commands import dance
    
  25. 
    
  26. 
    
  27. # A minimal set of apps to avoid system checks running on all apps.
    
  28. @override_settings(
    
  29.     INSTALLED_APPS=[
    
  30.         "django.contrib.auth",
    
  31.         "django.contrib.contenttypes",
    
  32.         "user_commands",
    
  33.     ],
    
  34. )
    
  35. class CommandTests(SimpleTestCase):
    
  36.     def test_command(self):
    
  37.         out = StringIO()
    
  38.         management.call_command("dance", stdout=out)
    
  39.         self.assertIn("I don't feel like dancing Rock'n'Roll.\n", out.getvalue())
    
  40. 
    
  41.     def test_command_style(self):
    
  42.         out = StringIO()
    
  43.         management.call_command("dance", style="Jive", stdout=out)
    
  44.         self.assertIn("I don't feel like dancing Jive.\n", out.getvalue())
    
  45.         # Passing options as arguments also works (thanks argparse)
    
  46.         management.call_command("dance", "--style", "Jive", stdout=out)
    
  47.         self.assertIn("I don't feel like dancing Jive.\n", out.getvalue())
    
  48. 
    
  49.     def test_language_preserved(self):
    
  50.         with translation.override("fr"):
    
  51.             management.call_command("dance", verbosity=0)
    
  52.             self.assertEqual(translation.get_language(), "fr")
    
  53. 
    
  54.     def test_explode(self):
    
  55.         """An unknown command raises CommandError"""
    
  56.         with self.assertRaisesMessage(CommandError, "Unknown command: 'explode'"):
    
  57.             management.call_command(("explode",))
    
  58. 
    
  59.     def test_system_exit(self):
    
  60.         """Exception raised in a command should raise CommandError with
    
  61.         call_command, but SystemExit when run from command line
    
  62.         """
    
  63.         with self.assertRaises(CommandError) as cm:
    
  64.             management.call_command("dance", example="raise")
    
  65.         self.assertEqual(cm.exception.returncode, 3)
    
  66.         dance.Command.requires_system_checks = []
    
  67.         try:
    
  68.             with captured_stderr() as stderr, self.assertRaises(SystemExit) as cm:
    
  69.                 management.ManagementUtility(
    
  70.                     ["manage.py", "dance", "--example=raise"]
    
  71.                 ).execute()
    
  72.             self.assertEqual(cm.exception.code, 3)
    
  73.         finally:
    
  74.             dance.Command.requires_system_checks = "__all__"
    
  75.         self.assertIn("CommandError", stderr.getvalue())
    
  76. 
    
  77.     def test_no_translations_deactivate_translations(self):
    
  78.         """
    
  79.         When the Command handle method is decorated with @no_translations,
    
  80.         translations are deactivated inside the command.
    
  81.         """
    
  82.         current_locale = translation.get_language()
    
  83.         with translation.override("pl"):
    
  84.             result = management.call_command("no_translations")
    
  85.             self.assertIsNone(result)
    
  86.         self.assertEqual(translation.get_language(), current_locale)
    
  87. 
    
  88.     def test_find_command_without_PATH(self):
    
  89.         """
    
  90.         find_command should still work when the PATH environment variable
    
  91.         doesn't exist (#22256).
    
  92.         """
    
  93.         current_path = os.environ.pop("PATH", None)
    
  94. 
    
  95.         try:
    
  96.             self.assertIsNone(find_command("_missing_"))
    
  97.         finally:
    
  98.             if current_path is not None:
    
  99.                 os.environ["PATH"] = current_path
    
  100. 
    
  101.     def test_discover_commands_in_eggs(self):
    
  102.         """
    
  103.         Management commands can also be loaded from Python eggs.
    
  104.         """
    
  105.         egg_dir = "%s/eggs" % os.path.dirname(__file__)
    
  106.         egg_name = "%s/basic.egg" % egg_dir
    
  107.         with extend_sys_path(egg_name):
    
  108.             with self.settings(INSTALLED_APPS=["commandegg"]):
    
  109.                 cmds = find_commands(
    
  110.                     os.path.join(apps.get_app_config("commandegg").path, "management")
    
  111.                 )
    
  112.         self.assertEqual(cmds, ["eggcommand"])
    
  113. 
    
  114.     def test_call_command_option_parsing(self):
    
  115.         """
    
  116.         When passing the long option name to call_command, the available option
    
  117.         key is the option dest name (#22985).
    
  118.         """
    
  119.         out = StringIO()
    
  120.         management.call_command("dance", stdout=out, opt_3=True)
    
  121.         self.assertIn("option3", out.getvalue())
    
  122.         self.assertNotIn("opt_3", out.getvalue())
    
  123.         self.assertNotIn("opt-3", out.getvalue())
    
  124. 
    
  125.     def test_call_command_option_parsing_non_string_arg(self):
    
  126.         """
    
  127.         It should be possible to pass non-string arguments to call_command.
    
  128.         """
    
  129.         out = StringIO()
    
  130.         management.call_command("dance", 1, verbosity=0, stdout=out)
    
  131.         self.assertIn("You passed 1 as a positional argument.", out.getvalue())
    
  132. 
    
  133.     def test_calling_a_command_with_only_empty_parameter_should_ends_gracefully(self):
    
  134.         out = StringIO()
    
  135.         management.call_command("hal", "--empty", stdout=out)
    
  136.         self.assertEqual(out.getvalue(), "\nDave, I can't do that.\n")
    
  137. 
    
  138.     def test_calling_command_with_app_labels_and_parameters_should_be_ok(self):
    
  139.         out = StringIO()
    
  140.         management.call_command("hal", "myapp", "--verbosity", "3", stdout=out)
    
  141.         self.assertIn(
    
  142.             "Dave, my mind is going. I can feel it. I can feel it.\n", out.getvalue()
    
  143.         )
    
  144. 
    
  145.     def test_calling_command_with_parameters_and_app_labels_at_the_end_should_be_ok(
    
  146.         self,
    
  147.     ):
    
  148.         out = StringIO()
    
  149.         management.call_command("hal", "--verbosity", "3", "myapp", stdout=out)
    
  150.         self.assertIn(
    
  151.             "Dave, my mind is going. I can feel it. I can feel it.\n", out.getvalue()
    
  152.         )
    
  153. 
    
  154.     def test_calling_a_command_with_no_app_labels_and_parameters_raise_command_error(
    
  155.         self,
    
  156.     ):
    
  157.         with self.assertRaises(CommandError):
    
  158.             management.call_command("hal")
    
  159. 
    
  160.     def test_output_transaction(self):
    
  161.         output = management.call_command(
    
  162.             "transaction", stdout=StringIO(), no_color=True
    
  163.         )
    
  164.         self.assertTrue(
    
  165.             output.strip().startswith(connection.ops.start_transaction_sql())
    
  166.         )
    
  167.         self.assertTrue(output.strip().endswith(connection.ops.end_transaction_sql()))
    
  168. 
    
  169.     def test_call_command_no_checks(self):
    
  170.         """
    
  171.         By default, call_command should not trigger the check framework, unless
    
  172.         specifically asked.
    
  173.         """
    
  174.         self.counter = 0
    
  175. 
    
  176.         def patched_check(self_, **kwargs):
    
  177.             self.counter += 1
    
  178.             self.kwargs = kwargs
    
  179. 
    
  180.         saved_check = BaseCommand.check
    
  181.         BaseCommand.check = patched_check
    
  182.         try:
    
  183.             management.call_command("dance", verbosity=0)
    
  184.             self.assertEqual(self.counter, 0)
    
  185.             management.call_command("dance", verbosity=0, skip_checks=False)
    
  186.             self.assertEqual(self.counter, 1)
    
  187.             self.assertEqual(self.kwargs, {})
    
  188.         finally:
    
  189.             BaseCommand.check = saved_check
    
  190. 
    
  191.     def test_requires_system_checks_empty(self):
    
  192.         with mock.patch(
    
  193.             "django.core.management.base.BaseCommand.check"
    
  194.         ) as mocked_check:
    
  195.             management.call_command("no_system_checks")
    
  196.         self.assertIs(mocked_check.called, False)
    
  197. 
    
  198.     def test_requires_system_checks_specific(self):
    
  199.         with mock.patch(
    
  200.             "django.core.management.base.BaseCommand.check"
    
  201.         ) as mocked_check:
    
  202.             management.call_command("specific_system_checks")
    
  203.         mocked_check.called_once_with(tags=[Tags.staticfiles, Tags.models])
    
  204. 
    
  205.     def test_requires_system_checks_invalid(self):
    
  206.         class Command(BaseCommand):
    
  207.             requires_system_checks = "x"
    
  208. 
    
  209.         msg = "requires_system_checks must be a list or tuple."
    
  210.         with self.assertRaisesMessage(TypeError, msg):
    
  211.             Command()
    
  212. 
    
  213.     def test_check_migrations(self):
    
  214.         requires_migrations_checks = dance.Command.requires_migrations_checks
    
  215.         self.assertIs(requires_migrations_checks, False)
    
  216.         try:
    
  217.             with mock.patch.object(BaseCommand, "check_migrations") as check_migrations:
    
  218.                 management.call_command("dance", verbosity=0)
    
  219.                 self.assertFalse(check_migrations.called)
    
  220.                 dance.Command.requires_migrations_checks = True
    
  221.                 management.call_command("dance", verbosity=0)
    
  222.                 self.assertTrue(check_migrations.called)
    
  223.         finally:
    
  224.             dance.Command.requires_migrations_checks = requires_migrations_checks
    
  225. 
    
  226.     def test_call_command_unrecognized_option(self):
    
  227.         msg = (
    
  228.             "Unknown option(s) for dance command: unrecognized. Valid options "
    
  229.             "are: example, force_color, help, integer, no_color, opt_3, "
    
  230.             "option3, pythonpath, settings, skip_checks, stderr, stdout, "
    
  231.             "style, traceback, verbosity, version."
    
  232.         )
    
  233.         with self.assertRaisesMessage(TypeError, msg):
    
  234.             management.call_command("dance", unrecognized=1)
    
  235. 
    
  236.         msg = (
    
  237.             "Unknown option(s) for dance command: unrecognized, unrecognized2. "
    
  238.             "Valid options are: example, force_color, help, integer, no_color, "
    
  239.             "opt_3, option3, pythonpath, settings, skip_checks, stderr, "
    
  240.             "stdout, style, traceback, verbosity, version."
    
  241.         )
    
  242.         with self.assertRaisesMessage(TypeError, msg):
    
  243.             management.call_command("dance", unrecognized=1, unrecognized2=1)
    
  244. 
    
  245.     def test_call_command_with_required_parameters_in_options(self):
    
  246.         out = StringIO()
    
  247.         management.call_command(
    
  248.             "required_option", need_me="foo", needme2="bar", stdout=out
    
  249.         )
    
  250.         self.assertIn("need_me", out.getvalue())
    
  251.         self.assertIn("needme2", out.getvalue())
    
  252. 
    
  253.     def test_call_command_with_required_parameters_in_mixed_options(self):
    
  254.         out = StringIO()
    
  255.         management.call_command(
    
  256.             "required_option", "--need-me=foo", needme2="bar", stdout=out
    
  257.         )
    
  258.         self.assertIn("need_me", out.getvalue())
    
  259.         self.assertIn("needme2", out.getvalue())
    
  260. 
    
  261.     def test_command_add_arguments_after_common_arguments(self):
    
  262.         out = StringIO()
    
  263.         management.call_command("common_args", stdout=out)
    
  264.         self.assertIn("Detected that --version already exists", out.getvalue())
    
  265. 
    
  266.     def test_mutually_exclusive_group_required_options(self):
    
  267.         out = StringIO()
    
  268.         management.call_command("mutually_exclusive_required", foo_id=1, stdout=out)
    
  269.         self.assertIn("foo_id", out.getvalue())
    
  270.         management.call_command(
    
  271.             "mutually_exclusive_required", foo_name="foo", stdout=out
    
  272.         )
    
  273.         self.assertIn("foo_name", out.getvalue())
    
  274.         msg = (
    
  275.             "Error: one of the arguments --foo-id --foo-name --foo-list "
    
  276.             "--append_const --const --count --flag_false --flag_true is "
    
  277.             "required"
    
  278.         )
    
  279.         with self.assertRaisesMessage(CommandError, msg):
    
  280.             management.call_command("mutually_exclusive_required", stdout=out)
    
  281. 
    
  282.     def test_mutually_exclusive_group_required_const_options(self):
    
  283.         tests = [
    
  284.             ("append_const", [42]),
    
  285.             ("const", 31),
    
  286.             ("count", 1),
    
  287.             ("flag_false", False),
    
  288.             ("flag_true", True),
    
  289.         ]
    
  290.         for arg, value in tests:
    
  291.             out = StringIO()
    
  292.             expected_output = "%s=%s" % (arg, value)
    
  293.             with self.subTest(arg=arg):
    
  294.                 management.call_command(
    
  295.                     "mutually_exclusive_required",
    
  296.                     "--%s" % arg,
    
  297.                     stdout=out,
    
  298.                 )
    
  299.                 self.assertIn(expected_output, out.getvalue())
    
  300.                 out.truncate(0)
    
  301.                 management.call_command(
    
  302.                     "mutually_exclusive_required",
    
  303.                     **{arg: value, "stdout": out},
    
  304.                 )
    
  305.                 self.assertIn(expected_output, out.getvalue())
    
  306. 
    
  307.     def test_mutually_exclusive_group_required_with_same_dest_options(self):
    
  308.         tests = [
    
  309.             {"until": "2"},
    
  310.             {"for": "1", "until": "2"},
    
  311.         ]
    
  312.         msg = (
    
  313.             "Cannot pass the dest 'until' that matches multiple arguments via "
    
  314.             "**options."
    
  315.         )
    
  316.         for options in tests:
    
  317.             with self.subTest(options=options):
    
  318.                 with self.assertRaisesMessage(TypeError, msg):
    
  319.                     management.call_command(
    
  320.                         "mutually_exclusive_required_with_same_dest",
    
  321.                         **options,
    
  322.                     )
    
  323. 
    
  324.     def test_mutually_exclusive_group_required_with_same_dest_args(self):
    
  325.         tests = [
    
  326.             ("--until=1",),
    
  327.             ("--until", 1),
    
  328.             ("--for=1",),
    
  329.             ("--for", 1),
    
  330.         ]
    
  331.         for args in tests:
    
  332.             out = StringIO()
    
  333.             with self.subTest(options=args):
    
  334.                 management.call_command(
    
  335.                     "mutually_exclusive_required_with_same_dest",
    
  336.                     *args,
    
  337.                     stdout=out,
    
  338.                 )
    
  339.                 output = out.getvalue()
    
  340.                 self.assertIn("until=1", output)
    
  341. 
    
  342.     def test_required_list_option(self):
    
  343.         tests = [
    
  344.             (("--foo-list", [1, 2]), {}),
    
  345.             ((), {"foo_list": [1, 2]}),
    
  346.         ]
    
  347.         for command in ["mutually_exclusive_required", "required_list_option"]:
    
  348.             for args, kwargs in tests:
    
  349.                 with self.subTest(command=command, args=args, kwargs=kwargs):
    
  350.                     out = StringIO()
    
  351.                     management.call_command(
    
  352.                         command,
    
  353.                         *args,
    
  354.                         **{**kwargs, "stdout": out},
    
  355.                     )
    
  356.                     self.assertIn("foo_list=[1, 2]", out.getvalue())
    
  357. 
    
  358.     def test_required_const_options(self):
    
  359.         args = {
    
  360.             "append_const": [42],
    
  361.             "const": 31,
    
  362.             "count": 1,
    
  363.             "flag_false": False,
    
  364.             "flag_true": True,
    
  365.         }
    
  366.         expected_output = "\n".join(
    
  367.             "%s=%s" % (arg, value) for arg, value in args.items()
    
  368.         )
    
  369.         out = StringIO()
    
  370.         management.call_command(
    
  371.             "required_constant_option",
    
  372.             "--append_const",
    
  373.             "--const",
    
  374.             "--count",
    
  375.             "--flag_false",
    
  376.             "--flag_true",
    
  377.             stdout=out,
    
  378.         )
    
  379.         self.assertIn(expected_output, out.getvalue())
    
  380.         out.truncate(0)
    
  381.         management.call_command("required_constant_option", **{**args, "stdout": out})
    
  382.         self.assertIn(expected_output, out.getvalue())
    
  383. 
    
  384.     def test_subparser(self):
    
  385.         out = StringIO()
    
  386.         management.call_command("subparser", "foo", 12, stdout=out)
    
  387.         self.assertIn("bar", out.getvalue())
    
  388. 
    
  389.     def test_subparser_dest_args(self):
    
  390.         out = StringIO()
    
  391.         management.call_command("subparser_dest", "foo", bar=12, stdout=out)
    
  392.         self.assertIn("bar", out.getvalue())
    
  393. 
    
  394.     def test_subparser_dest_required_args(self):
    
  395.         out = StringIO()
    
  396.         management.call_command(
    
  397.             "subparser_required", "foo_1", "foo_2", bar=12, stdout=out
    
  398.         )
    
  399.         self.assertIn("bar", out.getvalue())
    
  400. 
    
  401.     def test_subparser_invalid_option(self):
    
  402.         msg = "invalid choice: 'test' (choose from 'foo')"
    
  403.         with self.assertRaisesMessage(CommandError, msg):
    
  404.             management.call_command("subparser", "test", 12)
    
  405.         msg = "Error: the following arguments are required: subcommand"
    
  406.         with self.assertRaisesMessage(CommandError, msg):
    
  407.             management.call_command("subparser_dest", subcommand="foo", bar=12)
    
  408. 
    
  409.     def test_create_parser_kwargs(self):
    
  410.         """BaseCommand.create_parser() passes kwargs to CommandParser."""
    
  411.         epilog = "some epilog text"
    
  412.         parser = BaseCommand().create_parser(
    
  413.             "prog_name",
    
  414.             "subcommand",
    
  415.             epilog=epilog,
    
  416.             formatter_class=ArgumentDefaultsHelpFormatter,
    
  417.         )
    
  418.         self.assertEqual(parser.epilog, epilog)
    
  419.         self.assertEqual(parser.formatter_class, ArgumentDefaultsHelpFormatter)
    
  420. 
    
  421.     def test_outputwrapper_flush(self):
    
  422.         out = StringIO()
    
  423.         with mock.patch.object(out, "flush") as mocked_flush:
    
  424.             management.call_command("outputwrapper", stdout=out)
    
  425.         self.assertIn("Working...", out.getvalue())
    
  426.         self.assertIs(mocked_flush.called, True)
    
  427. 
    
  428. 
    
  429. class CommandRunTests(AdminScriptTestCase):
    
  430.     """
    
  431.     Tests that need to run by simulating the command line, not by call_command.
    
  432.     """
    
  433. 
    
  434.     def test_script_prefix_set_in_commands(self):
    
  435.         self.write_settings(
    
  436.             "settings.py",
    
  437.             apps=["user_commands"],
    
  438.             sdict={
    
  439.                 "ROOT_URLCONF": '"user_commands.urls"',
    
  440.                 "FORCE_SCRIPT_NAME": '"/PREFIX/"',
    
  441.             },
    
  442.         )
    
  443.         out, err = self.run_manage(["reverse_url"])
    
  444.         self.assertNoOutput(err)
    
  445.         self.assertEqual(out.strip(), "/PREFIX/some/url/")
    
  446. 
    
  447.     def test_disallowed_abbreviated_options(self):
    
  448.         """
    
  449.         To avoid conflicts with custom options, commands don't allow
    
  450.         abbreviated forms of the --setting and --pythonpath options.
    
  451.         """
    
  452.         self.write_settings("settings.py", apps=["user_commands"])
    
  453.         out, err = self.run_manage(["set_option", "--set", "foo"])
    
  454.         self.assertNoOutput(err)
    
  455.         self.assertEqual(out.strip(), "Set foo")
    
  456. 
    
  457.     def test_skip_checks(self):
    
  458.         self.write_settings(
    
  459.             "settings.py",
    
  460.             apps=["django.contrib.staticfiles", "user_commands"],
    
  461.             sdict={
    
  462.                 # (staticfiles.E001) The STATICFILES_DIRS setting is not a tuple or
    
  463.                 # list.
    
  464.                 "STATICFILES_DIRS": '"foo"',
    
  465.             },
    
  466.         )
    
  467.         out, err = self.run_manage(["set_option", "--skip-checks", "--set", "foo"])
    
  468.         self.assertNoOutput(err)
    
  469.         self.assertEqual(out.strip(), "Set foo")
    
  470. 
    
  471. 
    
  472. class UtilsTests(SimpleTestCase):
    
  473.     def test_no_existent_external_program(self):
    
  474.         msg = "Error executing a_42_command_that_doesnt_exist_42"
    
  475.         with self.assertRaisesMessage(CommandError, msg):
    
  476.             popen_wrapper(["a_42_command_that_doesnt_exist_42"])
    
  477. 
    
  478.     def test_get_random_secret_key(self):
    
  479.         key = get_random_secret_key()
    
  480.         self.assertEqual(len(key), 50)
    
  481.         for char in key:
    
  482.             self.assertIn(char, "abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*(-_=+)")
    
  483. 
    
  484.     def test_is_ignored_path_true(self):
    
  485.         patterns = (
    
  486.             ["foo/bar/baz"],
    
  487.             ["baz"],
    
  488.             ["foo/bar/baz"],
    
  489.             ["*/baz"],
    
  490.             ["*"],
    
  491.             ["b?z"],
    
  492.             ["[abc]az"],
    
  493.             ["*/ba[!z]/baz"],
    
  494.         )
    
  495.         for ignore_patterns in patterns:
    
  496.             with self.subTest(ignore_patterns=ignore_patterns):
    
  497.                 self.assertIs(
    
  498.                     is_ignored_path("foo/bar/baz", ignore_patterns=ignore_patterns),
    
  499.                     True,
    
  500.                 )
    
  501. 
    
  502.     def test_is_ignored_path_false(self):
    
  503.         self.assertIs(
    
  504.             is_ignored_path(
    
  505.                 "foo/bar/baz", ignore_patterns=["foo/bar/bat", "bar", "flub/blub"]
    
  506.             ),
    
  507.             False,
    
  508.         )
    
  509. 
    
  510.     def test_normalize_path_patterns_truncates_wildcard_base(self):
    
  511.         expected = [os.path.normcase(p) for p in ["foo/bar", "bar/*/"]]
    
  512.         self.assertEqual(normalize_path_patterns(["foo/bar/*", "bar/*/"]), expected)