1. import unittest
    
  2. from contextlib import contextmanager
    
  3. from io import StringIO
    
  4. from unittest import mock
    
  5. 
    
  6. from django.core.exceptions import ImproperlyConfigured
    
  7. from django.db import DatabaseError, connection
    
  8. from django.db.backends.base.creation import BaseDatabaseCreation
    
  9. from django.test import SimpleTestCase
    
  10. 
    
  11. try:
    
  12.     import psycopg2  # NOQA
    
  13. except ImportError:
    
  14.     pass
    
  15. else:
    
  16.     from psycopg2 import errorcodes
    
  17. 
    
  18.     from django.db.backends.postgresql.creation import DatabaseCreation
    
  19. 
    
  20. 
    
  21. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL tests")
    
  22. class DatabaseCreationTests(SimpleTestCase):
    
  23.     @contextmanager
    
  24.     def changed_test_settings(self, **kwargs):
    
  25.         settings = connection.settings_dict["TEST"]
    
  26.         saved_values = {}
    
  27.         for name in kwargs:
    
  28.             if name in settings:
    
  29.                 saved_values[name] = settings[name]
    
  30. 
    
  31.         for name, value in kwargs.items():
    
  32.             settings[name] = value
    
  33.         try:
    
  34.             yield
    
  35.         finally:
    
  36.             for name in kwargs:
    
  37.                 if name in saved_values:
    
  38.                     settings[name] = saved_values[name]
    
  39.                 else:
    
  40.                     del settings[name]
    
  41. 
    
  42.     def check_sql_table_creation_suffix(self, settings, expected):
    
  43.         with self.changed_test_settings(**settings):
    
  44.             creation = DatabaseCreation(connection)
    
  45.             suffix = creation.sql_table_creation_suffix()
    
  46.             self.assertEqual(suffix, expected)
    
  47. 
    
  48.     def test_sql_table_creation_suffix_with_none_settings(self):
    
  49.         settings = {"CHARSET": None, "TEMPLATE": None}
    
  50.         self.check_sql_table_creation_suffix(settings, "")
    
  51. 
    
  52.     def test_sql_table_creation_suffix_with_encoding(self):
    
  53.         settings = {"CHARSET": "UTF8"}
    
  54.         self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'")
    
  55. 
    
  56.     def test_sql_table_creation_suffix_with_template(self):
    
  57.         settings = {"TEMPLATE": "template0"}
    
  58.         self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"')
    
  59. 
    
  60.     def test_sql_table_creation_suffix_with_encoding_and_template(self):
    
  61.         settings = {"CHARSET": "UTF8", "TEMPLATE": "template0"}
    
  62.         self.check_sql_table_creation_suffix(
    
  63.             settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"'''
    
  64.         )
    
  65. 
    
  66.     def test_sql_table_creation_raises_with_collation(self):
    
  67.         settings = {"COLLATION": "test"}
    
  68.         msg = (
    
  69.             "PostgreSQL does not support collation setting at database "
    
  70.             "creation time."
    
  71.         )
    
  72.         with self.assertRaisesMessage(ImproperlyConfigured, msg):
    
  73.             self.check_sql_table_creation_suffix(settings, None)
    
  74. 
    
  75.     def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False):
    
  76.         error = DatabaseError("database %s already exists" % parameters["dbname"])
    
  77.         error.pgcode = errorcodes.DUPLICATE_DATABASE
    
  78.         raise DatabaseError() from error
    
  79. 
    
  80.     def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False):
    
  81.         error = DatabaseError("permission denied to create database")
    
  82.         error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE
    
  83.         raise DatabaseError() from error
    
  84. 
    
  85.     def patch_test_db_creation(self, execute_create_test_db):
    
  86.         return mock.patch.object(
    
  87.             BaseDatabaseCreation, "_execute_create_test_db", execute_create_test_db
    
  88.         )
    
  89. 
    
  90.     @mock.patch("sys.stdout", new_callable=StringIO)
    
  91.     @mock.patch("sys.stderr", new_callable=StringIO)
    
  92.     def test_create_test_db(self, *mocked_objects):
    
  93.         creation = DatabaseCreation(connection)
    
  94.         # Simulate test database creation raising "database already exists"
    
  95.         with self.patch_test_db_creation(self._execute_raise_database_already_exists):
    
  96.             with mock.patch("builtins.input", return_value="no"):
    
  97.                 with self.assertRaises(SystemExit):
    
  98.                     # SystemExit is raised if the user answers "no" to the
    
  99.                     # prompt asking if it's okay to delete the test database.
    
  100.                     creation._create_test_db(
    
  101.                         verbosity=0, autoclobber=False, keepdb=False
    
  102.                     )
    
  103.             # "Database already exists" error is ignored when keepdb is on
    
  104.             creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
    
  105.         # Simulate test database creation raising unexpected error
    
  106.         with self.patch_test_db_creation(self._execute_raise_permission_denied):
    
  107.             with mock.patch.object(
    
  108.                 DatabaseCreation, "_database_exists", return_value=False
    
  109.             ):
    
  110.                 with self.assertRaises(SystemExit):
    
  111.                     creation._create_test_db(
    
  112.                         verbosity=0, autoclobber=False, keepdb=False
    
  113.                     )
    
  114.                 with self.assertRaises(SystemExit):
    
  115.                     creation._create_test_db(
    
  116.                         verbosity=0, autoclobber=False, keepdb=True
    
  117.                     )
    
  118.         # Simulate test database creation raising "insufficient privileges".
    
  119.         # An error shouldn't appear when keepdb is on and the database already
    
  120.         # exists.
    
  121.         with self.patch_test_db_creation(self._execute_raise_permission_denied):
    
  122.             with mock.patch.object(
    
  123.                 DatabaseCreation, "_database_exists", return_value=True
    
  124.             ):
    
  125.                 creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)