1. from django.contrib.messages import constants, get_level, set_level
    
  2. from django.contrib.messages.api import MessageFailure
    
  3. from django.contrib.messages.constants import DEFAULT_LEVELS
    
  4. from django.contrib.messages.storage import default_storage
    
  5. from django.contrib.messages.storage.base import Message
    
  6. from django.http import HttpRequest, HttpResponse
    
  7. from django.test import modify_settings, override_settings
    
  8. from django.urls import reverse
    
  9. from django.utils.translation import gettext_lazy
    
  10. 
    
  11. 
    
  12. def add_level_messages(storage):
    
  13.     """
    
  14.     Add 6 messages from different levels (including a custom one) to a storage
    
  15.     instance.
    
  16.     """
    
  17.     storage.add(constants.INFO, "A generic info message")
    
  18.     storage.add(29, "Some custom level")
    
  19.     storage.add(constants.DEBUG, "A debugging message", extra_tags="extra-tag")
    
  20.     storage.add(constants.WARNING, "A warning")
    
  21.     storage.add(constants.ERROR, "An error")
    
  22.     storage.add(constants.SUCCESS, "This was a triumph.")
    
  23. 
    
  24. 
    
  25. class BaseTests:
    
  26.     storage_class = default_storage
    
  27.     levels = {
    
  28.         "debug": constants.DEBUG,
    
  29.         "info": constants.INFO,
    
  30.         "success": constants.SUCCESS,
    
  31.         "warning": constants.WARNING,
    
  32.         "error": constants.ERROR,
    
  33.     }
    
  34. 
    
  35.     def setUp(self):
    
  36.         self.settings_override = override_settings(
    
  37.             TEMPLATES=[
    
  38.                 {
    
  39.                     "BACKEND": "django.template.backends.django.DjangoTemplates",
    
  40.                     "DIRS": [],
    
  41.                     "APP_DIRS": True,
    
  42.                     "OPTIONS": {
    
  43.                         "context_processors": (
    
  44.                             "django.contrib.auth.context_processors.auth",
    
  45.                             "django.contrib.messages.context_processors.messages",
    
  46.                         ),
    
  47.                     },
    
  48.                 }
    
  49.             ],
    
  50.             ROOT_URLCONF="messages_tests.urls",
    
  51.             MESSAGE_TAGS={},
    
  52.             MESSAGE_STORAGE="%s.%s"
    
  53.             % (self.storage_class.__module__, self.storage_class.__name__),
    
  54.             SESSION_SERIALIZER="django.contrib.sessions.serializers.JSONSerializer",
    
  55.         )
    
  56.         self.settings_override.enable()
    
  57. 
    
  58.     def tearDown(self):
    
  59.         self.settings_override.disable()
    
  60. 
    
  61.     def get_request(self):
    
  62.         return HttpRequest()
    
  63. 
    
  64.     def get_response(self):
    
  65.         return HttpResponse()
    
  66. 
    
  67.     def get_storage(self, data=None):
    
  68.         """
    
  69.         Return the storage backend, setting its loaded data to the ``data``
    
  70.         argument.
    
  71. 
    
  72.         This method avoids the storage ``_get`` method from getting called so
    
  73.         that other parts of the storage backend can be tested independent of
    
  74.         the message retrieval logic.
    
  75.         """
    
  76.         storage = self.storage_class(self.get_request())
    
  77.         storage._loaded_data = data or []
    
  78.         return storage
    
  79. 
    
  80.     def test_repr(self):
    
  81.         request = self.get_request()
    
  82.         storage = self.storage_class(request)
    
  83.         self.assertEqual(
    
  84.             repr(storage),
    
  85.             f"<{self.storage_class.__qualname__}: request=<HttpRequest>>",
    
  86.         )
    
  87. 
    
  88.     def test_add(self):
    
  89.         storage = self.get_storage()
    
  90.         self.assertFalse(storage.added_new)
    
  91.         storage.add(constants.INFO, "Test message 1")
    
  92.         self.assertTrue(storage.added_new)
    
  93.         storage.add(constants.INFO, "Test message 2", extra_tags="tag")
    
  94.         self.assertEqual(len(storage), 2)
    
  95. 
    
  96.     def test_add_lazy_translation(self):
    
  97.         storage = self.get_storage()
    
  98.         response = self.get_response()
    
  99. 
    
  100.         storage.add(constants.INFO, gettext_lazy("lazy message"))
    
  101.         storage.update(response)
    
  102. 
    
  103.         storing = self.stored_messages_count(storage, response)
    
  104.         self.assertEqual(storing, 1)
    
  105. 
    
  106.     def test_no_update(self):
    
  107.         storage = self.get_storage()
    
  108.         response = self.get_response()
    
  109.         storage.update(response)
    
  110.         storing = self.stored_messages_count(storage, response)
    
  111.         self.assertEqual(storing, 0)
    
  112. 
    
  113.     def test_add_update(self):
    
  114.         storage = self.get_storage()
    
  115.         response = self.get_response()
    
  116. 
    
  117.         storage.add(constants.INFO, "Test message 1")
    
  118.         storage.add(constants.INFO, "Test message 1", extra_tags="tag")
    
  119.         storage.update(response)
    
  120. 
    
  121.         storing = self.stored_messages_count(storage, response)
    
  122.         self.assertEqual(storing, 2)
    
  123. 
    
  124.     def test_existing_add_read_update(self):
    
  125.         storage = self.get_existing_storage()
    
  126.         response = self.get_response()
    
  127. 
    
  128.         storage.add(constants.INFO, "Test message 3")
    
  129.         list(storage)  # Simulates a read
    
  130.         storage.update(response)
    
  131. 
    
  132.         storing = self.stored_messages_count(storage, response)
    
  133.         self.assertEqual(storing, 0)
    
  134. 
    
  135.     def test_existing_read_add_update(self):
    
  136.         storage = self.get_existing_storage()
    
  137.         response = self.get_response()
    
  138. 
    
  139.         list(storage)  # Simulates a read
    
  140.         storage.add(constants.INFO, "Test message 3")
    
  141.         storage.update(response)
    
  142. 
    
  143.         storing = self.stored_messages_count(storage, response)
    
  144.         self.assertEqual(storing, 1)
    
  145. 
    
  146.     @override_settings(MESSAGE_LEVEL=constants.DEBUG)
    
  147.     def test_full_request_response_cycle(self):
    
  148.         """
    
  149.         With the message middleware enabled, messages are properly stored and
    
  150.         retrieved across the full request/redirect/response cycle.
    
  151.         """
    
  152.         data = {
    
  153.             "messages": ["Test message %d" % x for x in range(5)],
    
  154.         }
    
  155.         show_url = reverse("show_message")
    
  156.         for level in ("debug", "info", "success", "warning", "error"):
    
  157.             add_url = reverse("add_message", args=(level,))
    
  158.             response = self.client.post(add_url, data, follow=True)
    
  159.             self.assertRedirects(response, show_url)
    
  160.             self.assertIn("messages", response.context)
    
  161.             messages = [Message(self.levels[level], msg) for msg in data["messages"]]
    
  162.             self.assertEqual(list(response.context["messages"]), messages)
    
  163.             for msg in data["messages"]:
    
  164.                 self.assertContains(response, msg)
    
  165. 
    
  166.     @override_settings(MESSAGE_LEVEL=constants.DEBUG)
    
  167.     def test_with_template_response(self):
    
  168.         data = {
    
  169.             "messages": ["Test message %d" % x for x in range(5)],
    
  170.         }
    
  171.         show_url = reverse("show_template_response")
    
  172.         for level in self.levels:
    
  173.             add_url = reverse("add_template_response", args=(level,))
    
  174.             response = self.client.post(add_url, data, follow=True)
    
  175.             self.assertRedirects(response, show_url)
    
  176.             self.assertIn("messages", response.context)
    
  177.             for msg in data["messages"]:
    
  178.                 self.assertContains(response, msg)
    
  179. 
    
  180.             # there shouldn't be any messages on second GET request
    
  181.             response = self.client.get(show_url)
    
  182.             for msg in data["messages"]:
    
  183.                 self.assertNotContains(response, msg)
    
  184. 
    
  185.     def test_context_processor_message_levels(self):
    
  186.         show_url = reverse("show_template_response")
    
  187.         response = self.client.get(show_url)
    
  188. 
    
  189.         self.assertIn("DEFAULT_MESSAGE_LEVELS", response.context)
    
  190.         self.assertEqual(response.context["DEFAULT_MESSAGE_LEVELS"], DEFAULT_LEVELS)
    
  191. 
    
  192.     @override_settings(MESSAGE_LEVEL=constants.DEBUG)
    
  193.     def test_multiple_posts(self):
    
  194.         """
    
  195.         Messages persist properly when multiple POSTs are made before a GET.
    
  196.         """
    
  197.         data = {
    
  198.             "messages": ["Test message %d" % x for x in range(5)],
    
  199.         }
    
  200.         show_url = reverse("show_message")
    
  201.         messages = []
    
  202.         for level in ("debug", "info", "success", "warning", "error"):
    
  203.             messages.extend(
    
  204.                 Message(self.levels[level], msg) for msg in data["messages"]
    
  205.             )
    
  206.             add_url = reverse("add_message", args=(level,))
    
  207.             self.client.post(add_url, data)
    
  208.         response = self.client.get(show_url)
    
  209.         self.assertIn("messages", response.context)
    
  210.         self.assertEqual(list(response.context["messages"]), messages)
    
  211.         for msg in data["messages"]:
    
  212.             self.assertContains(response, msg)
    
  213. 
    
  214.     @modify_settings(
    
  215.         INSTALLED_APPS={"remove": "django.contrib.messages"},
    
  216.         MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
    
  217.     )
    
  218.     @override_settings(
    
  219.         MESSAGE_LEVEL=constants.DEBUG,
    
  220.         TEMPLATES=[
    
  221.             {
    
  222.                 "BACKEND": "django.template.backends.django.DjangoTemplates",
    
  223.                 "DIRS": [],
    
  224.                 "APP_DIRS": True,
    
  225.             }
    
  226.         ],
    
  227.     )
    
  228.     def test_middleware_disabled(self):
    
  229.         """
    
  230.         When the middleware is disabled, an exception is raised when one
    
  231.         attempts to store a message.
    
  232.         """
    
  233.         data = {
    
  234.             "messages": ["Test message %d" % x for x in range(5)],
    
  235.         }
    
  236.         reverse("show_message")
    
  237.         for level in ("debug", "info", "success", "warning", "error"):
    
  238.             add_url = reverse("add_message", args=(level,))
    
  239.             with self.assertRaises(MessageFailure):
    
  240.                 self.client.post(add_url, data, follow=True)
    
  241. 
    
  242.     @modify_settings(
    
  243.         INSTALLED_APPS={"remove": "django.contrib.messages"},
    
  244.         MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
    
  245.     )
    
  246.     @override_settings(
    
  247.         TEMPLATES=[
    
  248.             {
    
  249.                 "BACKEND": "django.template.backends.django.DjangoTemplates",
    
  250.                 "DIRS": [],
    
  251.                 "APP_DIRS": True,
    
  252.             }
    
  253.         ],
    
  254.     )
    
  255.     def test_middleware_disabled_fail_silently(self):
    
  256.         """
    
  257.         When the middleware is disabled, an exception is not raised
    
  258.         if 'fail_silently' is True.
    
  259.         """
    
  260.         data = {
    
  261.             "messages": ["Test message %d" % x for x in range(5)],
    
  262.             "fail_silently": True,
    
  263.         }
    
  264.         show_url = reverse("show_message")
    
  265.         for level in ("debug", "info", "success", "warning", "error"):
    
  266.             add_url = reverse("add_message", args=(level,))
    
  267.             response = self.client.post(add_url, data, follow=True)
    
  268.             self.assertRedirects(response, show_url)
    
  269.             self.assertNotIn("messages", response.context)
    
  270. 
    
  271.     def stored_messages_count(self, storage, response):
    
  272.         """
    
  273.         Return the number of messages being stored after a
    
  274.         ``storage.update()`` call.
    
  275.         """
    
  276.         raise NotImplementedError("This method must be set by a subclass.")
    
  277. 
    
  278.     def test_get(self):
    
  279.         raise NotImplementedError("This method must be set by a subclass.")
    
  280. 
    
  281.     def get_existing_storage(self):
    
  282.         return self.get_storage(
    
  283.             [
    
  284.                 Message(constants.INFO, "Test message 1"),
    
  285.                 Message(constants.INFO, "Test message 2", extra_tags="tag"),
    
  286.             ]
    
  287.         )
    
  288. 
    
  289.     def test_existing_read(self):
    
  290.         """
    
  291.         Reading the existing storage doesn't cause the data to be lost.
    
  292.         """
    
  293.         storage = self.get_existing_storage()
    
  294.         self.assertFalse(storage.used)
    
  295.         # After iterating the storage engine directly, the used flag is set.
    
  296.         data = list(storage)
    
  297.         self.assertTrue(storage.used)
    
  298.         # The data does not disappear because it has been iterated.
    
  299.         self.assertEqual(data, list(storage))
    
  300. 
    
  301.     def test_existing_add(self):
    
  302.         storage = self.get_existing_storage()
    
  303.         self.assertFalse(storage.added_new)
    
  304.         storage.add(constants.INFO, "Test message 3")
    
  305.         self.assertTrue(storage.added_new)
    
  306. 
    
  307.     def test_default_level(self):
    
  308.         # get_level works even with no storage on the request.
    
  309.         request = self.get_request()
    
  310.         self.assertEqual(get_level(request), constants.INFO)
    
  311. 
    
  312.         # get_level returns the default level if it hasn't been set.
    
  313.         storage = self.get_storage()
    
  314.         request._messages = storage
    
  315.         self.assertEqual(get_level(request), constants.INFO)
    
  316. 
    
  317.         # Only messages of sufficient level get recorded.
    
  318.         add_level_messages(storage)
    
  319.         self.assertEqual(len(storage), 5)
    
  320. 
    
  321.     def test_low_level(self):
    
  322.         request = self.get_request()
    
  323.         storage = self.storage_class(request)
    
  324.         request._messages = storage
    
  325. 
    
  326.         self.assertTrue(set_level(request, 5))
    
  327.         self.assertEqual(get_level(request), 5)
    
  328. 
    
  329.         add_level_messages(storage)
    
  330.         self.assertEqual(len(storage), 6)
    
  331. 
    
  332.     def test_high_level(self):
    
  333.         request = self.get_request()
    
  334.         storage = self.storage_class(request)
    
  335.         request._messages = storage
    
  336. 
    
  337.         self.assertTrue(set_level(request, 30))
    
  338.         self.assertEqual(get_level(request), 30)
    
  339. 
    
  340.         add_level_messages(storage)
    
  341.         self.assertEqual(len(storage), 2)
    
  342. 
    
  343.     @override_settings(MESSAGE_LEVEL=29)
    
  344.     def test_settings_level(self):
    
  345.         request = self.get_request()
    
  346.         storage = self.storage_class(request)
    
  347. 
    
  348.         self.assertEqual(get_level(request), 29)
    
  349. 
    
  350.         add_level_messages(storage)
    
  351.         self.assertEqual(len(storage), 3)
    
  352. 
    
  353.     def test_tags(self):
    
  354.         storage = self.get_storage()
    
  355.         storage.level = 0
    
  356.         add_level_messages(storage)
    
  357.         storage.add(constants.INFO, "A generic info message", extra_tags=None)
    
  358.         tags = [msg.tags for msg in storage]
    
  359.         self.assertEqual(
    
  360.             tags, ["info", "", "extra-tag debug", "warning", "error", "success", "info"]
    
  361.         )
    
  362. 
    
  363.     def test_level_tag(self):
    
  364.         storage = self.get_storage()
    
  365.         storage.level = 0
    
  366.         add_level_messages(storage)
    
  367.         tags = [msg.level_tag for msg in storage]
    
  368.         self.assertEqual(tags, ["info", "", "debug", "warning", "error", "success"])
    
  369. 
    
  370.     @override_settings(
    
  371.         MESSAGE_TAGS={
    
  372.             constants.INFO: "info",
    
  373.             constants.DEBUG: "",
    
  374.             constants.WARNING: "",
    
  375.             constants.ERROR: "bad",
    
  376.             29: "custom",
    
  377.         }
    
  378.     )
    
  379.     def test_custom_tags(self):
    
  380.         storage = self.get_storage()
    
  381.         storage.level = 0
    
  382.         add_level_messages(storage)
    
  383.         tags = [msg.tags for msg in storage]
    
  384.         self.assertEqual(tags, ["info", "custom", "extra-tag", "", "bad", "success"])