1. """
    
  2. Tests for django.core.servers.
    
  3. """
    
  4. import errno
    
  5. import os
    
  6. import socket
    
  7. import threading
    
  8. from http.client import HTTPConnection
    
  9. from urllib.error import HTTPError
    
  10. from urllib.parse import urlencode
    
  11. from urllib.request import urlopen
    
  12. 
    
  13. from django.conf import settings
    
  14. from django.core.servers.basehttp import ThreadedWSGIServer, WSGIServer
    
  15. from django.db import DEFAULT_DB_ALIAS, connections
    
  16. from django.test import LiveServerTestCase, override_settings
    
  17. from django.test.testcases import LiveServerThread, QuietWSGIRequestHandler
    
  18. 
    
  19. from .models import Person
    
  20. 
    
  21. TEST_ROOT = os.path.dirname(__file__)
    
  22. TEST_SETTINGS = {
    
  23.     "MEDIA_URL": "media/",
    
  24.     "MEDIA_ROOT": os.path.join(TEST_ROOT, "media"),
    
  25.     "STATIC_URL": "static/",
    
  26.     "STATIC_ROOT": os.path.join(TEST_ROOT, "static"),
    
  27. }
    
  28. 
    
  29. 
    
  30. @override_settings(ROOT_URLCONF="servers.urls", **TEST_SETTINGS)
    
  31. class LiveServerBase(LiveServerTestCase):
    
  32.     available_apps = [
    
  33.         "servers",
    
  34.         "django.contrib.auth",
    
  35.         "django.contrib.contenttypes",
    
  36.         "django.contrib.sessions",
    
  37.     ]
    
  38.     fixtures = ["testdata.json"]
    
  39. 
    
  40.     def urlopen(self, url):
    
  41.         return urlopen(self.live_server_url + url)
    
  42. 
    
  43. 
    
  44. class CloseConnectionTestServer(ThreadedWSGIServer):
    
  45.     def __init__(self, *args, **kwargs):
    
  46.         super().__init__(*args, **kwargs)
    
  47.         # This event is set right after the first time a request closes its
    
  48.         # database connections.
    
  49.         self._connections_closed = threading.Event()
    
  50. 
    
  51.     def _close_connections(self):
    
  52.         super()._close_connections()
    
  53.         self._connections_closed.set()
    
  54. 
    
  55. 
    
  56. class CloseConnectionTestLiveServerThread(LiveServerThread):
    
  57.     server_class = CloseConnectionTestServer
    
  58. 
    
  59.     def _create_server(self, connections_override=None):
    
  60.         return super()._create_server(connections_override=self.connections_override)
    
  61. 
    
  62. 
    
  63. class LiveServerTestCloseConnectionTest(LiveServerBase):
    
  64.     server_thread_class = CloseConnectionTestLiveServerThread
    
  65. 
    
  66.     @classmethod
    
  67.     def _make_connections_override(cls):
    
  68.         conn = connections[DEFAULT_DB_ALIAS]
    
  69.         cls.conn = conn
    
  70.         cls.old_conn_max_age = conn.settings_dict["CONN_MAX_AGE"]
    
  71.         # Set the connection's CONN_MAX_AGE to None to simulate the
    
  72.         # CONN_MAX_AGE setting being set to None on the server. This prevents
    
  73.         # Django from closing the connection and allows testing that
    
  74.         # ThreadedWSGIServer closes connections.
    
  75.         conn.settings_dict["CONN_MAX_AGE"] = None
    
  76.         # Pass a database connection through to the server to check it is being
    
  77.         # closed by ThreadedWSGIServer.
    
  78.         return {DEFAULT_DB_ALIAS: conn}
    
  79. 
    
  80.     @classmethod
    
  81.     def tearDownConnectionTest(cls):
    
  82.         cls.conn.settings_dict["CONN_MAX_AGE"] = cls.old_conn_max_age
    
  83. 
    
  84.     @classmethod
    
  85.     def tearDownClass(cls):
    
  86.         cls.tearDownConnectionTest()
    
  87.         super().tearDownClass()
    
  88. 
    
  89.     def test_closes_connections(self):
    
  90.         # The server's request thread sets this event after closing
    
  91.         # its database connections.
    
  92.         closed_event = self.server_thread.httpd._connections_closed
    
  93.         conn = self.conn
    
  94.         # Open a connection to the database.
    
  95.         conn.connect()
    
  96.         self.assertIsNotNone(conn.connection)
    
  97.         with self.urlopen("/model_view/") as f:
    
  98.             # The server can access the database.
    
  99.             self.assertCountEqual(f.read().splitlines(), [b"jane", b"robert"])
    
  100.         # Wait for the server's request thread to close the connection.
    
  101.         # A timeout of 0.1 seconds should be more than enough. If the wait
    
  102.         # times out, the assertion after should fail.
    
  103.         closed_event.wait(timeout=0.1)
    
  104.         self.assertIsNone(conn.connection)
    
  105. 
    
  106. 
    
  107. class FailingLiveServerThread(LiveServerThread):
    
  108.     def _create_server(self):
    
  109.         raise RuntimeError("Error creating server.")
    
  110. 
    
  111. 
    
  112. class LiveServerTestCaseSetupTest(LiveServerBase):
    
  113.     server_thread_class = FailingLiveServerThread
    
  114. 
    
  115.     @classmethod
    
  116.     def check_allowed_hosts(cls, expected):
    
  117.         if settings.ALLOWED_HOSTS != expected:
    
  118.             raise RuntimeError(f"{settings.ALLOWED_HOSTS} != {expected}")
    
  119. 
    
  120.     @classmethod
    
  121.     def setUpClass(cls):
    
  122.         cls.check_allowed_hosts(["testserver"])
    
  123.         try:
    
  124.             super().setUpClass()
    
  125.         except RuntimeError:
    
  126.             # LiveServerTestCase's change to ALLOWED_HOSTS should be reverted.
    
  127.             cls.doClassCleanups()
    
  128.             cls.check_allowed_hosts(["testserver"])
    
  129.         else:
    
  130.             raise RuntimeError("Server did not fail.")
    
  131.         cls.set_up_called = True
    
  132. 
    
  133.     def test_set_up_class(self):
    
  134.         self.assertIs(self.set_up_called, True)
    
  135. 
    
  136. 
    
  137. class LiveServerAddress(LiveServerBase):
    
  138.     @classmethod
    
  139.     def setUpClass(cls):
    
  140.         super().setUpClass()
    
  141.         # put it in a list to prevent descriptor lookups in test
    
  142.         cls.live_server_url_test = [cls.live_server_url]
    
  143. 
    
  144.     def test_live_server_url_is_class_property(self):
    
  145.         self.assertIsInstance(self.live_server_url_test[0], str)
    
  146.         self.assertEqual(self.live_server_url_test[0], self.live_server_url)
    
  147. 
    
  148. 
    
  149. class LiveServerSingleThread(LiveServerThread):
    
  150.     def _create_server(self):
    
  151.         return WSGIServer(
    
  152.             (self.host, self.port), QuietWSGIRequestHandler, allow_reuse_address=False
    
  153.         )
    
  154. 
    
  155. 
    
  156. class SingleThreadLiveServerTestCase(LiveServerTestCase):
    
  157.     server_thread_class = LiveServerSingleThread
    
  158. 
    
  159. 
    
  160. class LiveServerViews(LiveServerBase):
    
  161.     def test_protocol(self):
    
  162.         """Launched server serves with HTTP 1.1."""
    
  163.         with self.urlopen("/example_view/") as f:
    
  164.             self.assertEqual(f.version, 11)
    
  165. 
    
  166.     def test_closes_connection_without_content_length(self):
    
  167.         """
    
  168.         An HTTP 1.1 server is supposed to support keep-alive. Since our
    
  169.         development server is rather simple we support it only in cases where
    
  170.         we can detect a content length from the response. This should be doable
    
  171.         for all simple views and streaming responses where an iterable with
    
  172.         length of one is passed. The latter follows as result of `set_content_length`
    
  173.         from https://github.com/python/cpython/blob/main/Lib/wsgiref/handlers.py.
    
  174. 
    
  175.         If we cannot detect a content length we explicitly set the `Connection`
    
  176.         header to `close` to notify the client that we do not actually support
    
  177.         it.
    
  178.         """
    
  179.         conn = HTTPConnection(
    
  180.             LiveServerViews.server_thread.host,
    
  181.             LiveServerViews.server_thread.port,
    
  182.             timeout=1,
    
  183.         )
    
  184.         try:
    
  185.             conn.request(
    
  186.                 "GET", "/streaming_example_view/", headers={"Connection": "keep-alive"}
    
  187.             )
    
  188.             response = conn.getresponse()
    
  189.             self.assertTrue(response.will_close)
    
  190.             self.assertEqual(response.read(), b"Iamastream")
    
  191.             self.assertEqual(response.status, 200)
    
  192.             self.assertEqual(response.getheader("Connection"), "close")
    
  193. 
    
  194.             conn.request(
    
  195.                 "GET", "/streaming_example_view/", headers={"Connection": "close"}
    
  196.             )
    
  197.             response = conn.getresponse()
    
  198.             self.assertTrue(response.will_close)
    
  199.             self.assertEqual(response.read(), b"Iamastream")
    
  200.             self.assertEqual(response.status, 200)
    
  201.             self.assertEqual(response.getheader("Connection"), "close")
    
  202.         finally:
    
  203.             conn.close()
    
  204. 
    
  205.     def test_keep_alive_on_connection_with_content_length(self):
    
  206.         """
    
  207.         See `test_closes_connection_without_content_length` for details. This
    
  208.         is a follow up test, which ensure that we do not close the connection
    
  209.         if not needed, hence allowing us to take advantage of keep-alive.
    
  210.         """
    
  211.         conn = HTTPConnection(
    
  212.             LiveServerViews.server_thread.host, LiveServerViews.server_thread.port
    
  213.         )
    
  214.         try:
    
  215.             conn.request("GET", "/example_view/", headers={"Connection": "keep-alive"})
    
  216.             response = conn.getresponse()
    
  217.             self.assertFalse(response.will_close)
    
  218.             self.assertEqual(response.read(), b"example view")
    
  219.             self.assertEqual(response.status, 200)
    
  220.             self.assertIsNone(response.getheader("Connection"))
    
  221. 
    
  222.             conn.request("GET", "/example_view/", headers={"Connection": "close"})
    
  223.             response = conn.getresponse()
    
  224.             self.assertFalse(response.will_close)
    
  225.             self.assertEqual(response.read(), b"example view")
    
  226.             self.assertEqual(response.status, 200)
    
  227.             self.assertIsNone(response.getheader("Connection"))
    
  228.         finally:
    
  229.             conn.close()
    
  230. 
    
  231.     def test_keep_alive_connection_clears_previous_request_data(self):
    
  232.         conn = HTTPConnection(
    
  233.             LiveServerViews.server_thread.host, LiveServerViews.server_thread.port
    
  234.         )
    
  235.         try:
    
  236.             conn.request(
    
  237.                 "POST", "/method_view/", b"{}", headers={"Connection": "keep-alive"}
    
  238.             )
    
  239.             response = conn.getresponse()
    
  240.             self.assertFalse(response.will_close)
    
  241.             self.assertEqual(response.status, 200)
    
  242.             self.assertEqual(response.read(), b"POST")
    
  243. 
    
  244.             conn.request(
    
  245.                 "POST", "/method_view/", b"{}", headers={"Connection": "close"}
    
  246.             )
    
  247.             response = conn.getresponse()
    
  248.             self.assertFalse(response.will_close)
    
  249.             self.assertEqual(response.status, 200)
    
  250.             self.assertEqual(response.read(), b"POST")
    
  251.         finally:
    
  252.             conn.close()
    
  253. 
    
  254.     def test_404(self):
    
  255.         with self.assertRaises(HTTPError) as err:
    
  256.             self.urlopen("/")
    
  257.         err.exception.close()
    
  258.         self.assertEqual(err.exception.code, 404, "Expected 404 response")
    
  259. 
    
  260.     def test_view(self):
    
  261.         with self.urlopen("/example_view/") as f:
    
  262.             self.assertEqual(f.read(), b"example view")
    
  263. 
    
  264.     def test_static_files(self):
    
  265.         with self.urlopen("/static/example_static_file.txt") as f:
    
  266.             self.assertEqual(f.read().rstrip(b"\r\n"), b"example static file")
    
  267. 
    
  268.     def test_no_collectstatic_emulation(self):
    
  269.         """
    
  270.         LiveServerTestCase reports a 404 status code when HTTP client
    
  271.         tries to access a static file that isn't explicitly put under
    
  272.         STATIC_ROOT.
    
  273.         """
    
  274.         with self.assertRaises(HTTPError) as err:
    
  275.             self.urlopen("/static/another_app/another_app_static_file.txt")
    
  276.         err.exception.close()
    
  277.         self.assertEqual(err.exception.code, 404, "Expected 404 response")
    
  278. 
    
  279.     def test_media_files(self):
    
  280.         with self.urlopen("/media/example_media_file.txt") as f:
    
  281.             self.assertEqual(f.read().rstrip(b"\r\n"), b"example media file")
    
  282. 
    
  283.     def test_environ(self):
    
  284.         with self.urlopen("/environ_view/?%s" % urlencode({"q": "ั‚ะตัั‚"})) as f:
    
  285.             self.assertIn(b"QUERY_STRING: 'q=%D1%82%D0%B5%D1%81%D1%82'", f.read())
    
  286. 
    
  287. 
    
  288. @override_settings(ROOT_URLCONF="servers.urls")
    
  289. class SingleThreadLiveServerViews(SingleThreadLiveServerTestCase):
    
  290.     available_apps = ["servers"]
    
  291. 
    
  292.     def test_closes_connection_with_content_length(self):
    
  293.         """
    
  294.         Contrast to
    
  295.         LiveServerViews.test_keep_alive_on_connection_with_content_length().
    
  296.         Persistent connections require threading server.
    
  297.         """
    
  298.         conn = HTTPConnection(
    
  299.             SingleThreadLiveServerViews.server_thread.host,
    
  300.             SingleThreadLiveServerViews.server_thread.port,
    
  301.             timeout=1,
    
  302.         )
    
  303.         try:
    
  304.             conn.request("GET", "/example_view/", headers={"Connection": "keep-alive"})
    
  305.             response = conn.getresponse()
    
  306.             self.assertTrue(response.will_close)
    
  307.             self.assertEqual(response.read(), b"example view")
    
  308.             self.assertEqual(response.status, 200)
    
  309.             self.assertEqual(response.getheader("Connection"), "close")
    
  310.         finally:
    
  311.             conn.close()
    
  312. 
    
  313. 
    
  314. class LiveServerDatabase(LiveServerBase):
    
  315.     def test_fixtures_loaded(self):
    
  316.         """
    
  317.         Fixtures are properly loaded and visible to the live server thread.
    
  318.         """
    
  319.         with self.urlopen("/model_view/") as f:
    
  320.             self.assertCountEqual(f.read().splitlines(), [b"jane", b"robert"])
    
  321. 
    
  322.     def test_database_writes(self):
    
  323.         """
    
  324.         Data written to the database by a view can be read.
    
  325.         """
    
  326.         with self.urlopen("/create_model_instance/"):
    
  327.             pass
    
  328.         self.assertQuerysetEqual(
    
  329.             Person.objects.order_by("pk"),
    
  330.             ["jane", "robert", "emily"],
    
  331.             lambda b: b.name,
    
  332.         )
    
  333. 
    
  334. 
    
  335. class LiveServerPort(LiveServerBase):
    
  336.     def test_port_bind(self):
    
  337.         """
    
  338.         Each LiveServerTestCase binds to a unique port or fails to start a
    
  339.         server thread when run concurrently (#26011).
    
  340.         """
    
  341.         TestCase = type("TestCase", (LiveServerBase,), {})
    
  342.         try:
    
  343.             TestCase._start_server_thread()
    
  344.         except OSError as e:
    
  345.             if e.errno == errno.EADDRINUSE:
    
  346.                 # We're out of ports, LiveServerTestCase correctly fails with
    
  347.                 # an OSError.
    
  348.                 return
    
  349.             # Unexpected error.
    
  350.             raise
    
  351.         try:
    
  352.             self.assertNotEqual(
    
  353.                 self.live_server_url,
    
  354.                 TestCase.live_server_url,
    
  355.                 f"Acquired duplicate server addresses for server threads: "
    
  356.                 f"{self.live_server_url}",
    
  357.             )
    
  358.         finally:
    
  359.             TestCase.doClassCleanups()
    
  360. 
    
  361.     def test_specified_port_bind(self):
    
  362.         """LiveServerTestCase.port customizes the server's port."""
    
  363.         TestCase = type("TestCase", (LiveServerBase,), {})
    
  364.         # Find an open port and tell TestCase to use it.
    
  365.         s = socket.socket()
    
  366.         s.bind(("", 0))
    
  367.         TestCase.port = s.getsockname()[1]
    
  368.         s.close()
    
  369.         TestCase._start_server_thread()
    
  370.         try:
    
  371.             self.assertEqual(
    
  372.                 TestCase.port,
    
  373.                 TestCase.server_thread.port,
    
  374.                 f"Did not use specified port for LiveServerTestCase thread: "
    
  375.                 f"{TestCase.port}",
    
  376.             )
    
  377.         finally:
    
  378.             TestCase.doClassCleanups()
    
  379. 
    
  380. 
    
  381. class LiveServerThreadedTests(LiveServerBase):
    
  382.     """If LiveServerTestCase isn't threaded, these tests will hang."""
    
  383. 
    
  384.     def test_view_calls_subview(self):
    
  385.         url = "/subview_calling_view/?%s" % urlencode({"url": self.live_server_url})
    
  386.         with self.urlopen(url) as f:
    
  387.             self.assertEqual(f.read(), b"subview calling view: subview")
    
  388. 
    
  389.     def test_check_model_instance_from_subview(self):
    
  390.         url = "/check_model_instance_from_subview/?%s" % urlencode(
    
  391.             {
    
  392.                 "url": self.live_server_url,
    
  393.             }
    
  394.         )
    
  395.         with self.urlopen(url) as f:
    
  396.             self.assertIn(b"emily", f.read())