1. import subprocess
    
  2. import unittest
    
  3. from io import StringIO
    
  4. from unittest import mock
    
  5. 
    
  6. from django.db import DatabaseError, connection
    
  7. from django.db.backends.base.creation import BaseDatabaseCreation
    
  8. from django.db.backends.mysql.creation import DatabaseCreation
    
  9. from django.test import SimpleTestCase
    
  10. 
    
  11. 
    
  12. @unittest.skipUnless(connection.vendor == "mysql", "MySQL tests")
    
  13. class DatabaseCreationTests(SimpleTestCase):
    
  14.     def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
    
  15.         raise DatabaseError(
    
  16.             1007, "Can't create database '%s'; database exists" % parameters["dbname"]
    
  17.         )
    
  18. 
    
  19.     def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
    
  20.         raise DatabaseError(1044, "Access denied for user")
    
  21. 
    
  22.     def patch_test_db_creation(self, execute_create_test_db):
    
  23.         return mock.patch.object(
    
  24.             BaseDatabaseCreation, "_execute_create_test_db", execute_create_test_db
    
  25.         )
    
  26. 
    
  27.     @mock.patch("sys.stdout", new_callable=StringIO)
    
  28.     @mock.patch("sys.stderr", new_callable=StringIO)
    
  29.     def test_create_test_db_database_exists(self, *mocked_objects):
    
  30.         # Simulate test database creation raising "database exists"
    
  31.         creation = DatabaseCreation(connection)
    
  32.         with self.patch_test_db_creation(self._execute_raise_database_exists):
    
  33.             with mock.patch("builtins.input", return_value="no"):
    
  34.                 with self.assertRaises(SystemExit):
    
  35.                     # SystemExit is raised if the user answers "no" to the
    
  36.                     # prompt asking if it's okay to delete the test database.
    
  37.                     creation._create_test_db(
    
  38.                         verbosity=0, autoclobber=False, keepdb=False
    
  39.                     )
    
  40.             # "Database exists" shouldn't appear when keepdb is on
    
  41.             creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
    
  42. 
    
  43.     @mock.patch("sys.stdout", new_callable=StringIO)
    
  44.     @mock.patch("sys.stderr", new_callable=StringIO)
    
  45.     def test_create_test_db_unexpected_error(self, *mocked_objects):
    
  46.         # Simulate test database creation raising unexpected error
    
  47.         creation = DatabaseCreation(connection)
    
  48.         with self.patch_test_db_creation(self._execute_raise_access_denied):
    
  49.             with self.assertRaises(SystemExit):
    
  50.                 creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
    
  51. 
    
  52.     def test_clone_test_db_database_exists(self):
    
  53.         creation = DatabaseCreation(connection)
    
  54.         with self.patch_test_db_creation(self._execute_raise_database_exists):
    
  55.             with mock.patch.object(DatabaseCreation, "_clone_db") as _clone_db:
    
  56.                 creation._clone_test_db("suffix", verbosity=0, keepdb=True)
    
  57.                 _clone_db.assert_not_called()
    
  58. 
    
  59.     def test_clone_test_db_options_ordering(self):
    
  60.         creation = DatabaseCreation(connection)
    
  61.         try:
    
  62.             saved_settings = connection.settings_dict
    
  63.             connection.settings_dict = {
    
  64.                 "NAME": "source_db",
    
  65.                 "USER": "",
    
  66.                 "PASSWORD": "",
    
  67.                 "PORT": "",
    
  68.                 "HOST": "",
    
  69.                 "ENGINE": "django.db.backends.mysql",
    
  70.                 "OPTIONS": {
    
  71.                     "read_default_file": "my.cnf",
    
  72.                 },
    
  73.             }
    
  74.             with mock.patch.object(subprocess, "Popen") as mocked_popen:
    
  75.                 creation._clone_db("source_db", "target_db")
    
  76.                 mocked_popen.assert_has_calls(
    
  77.                     [
    
  78.                         mock.call(
    
  79.                             [
    
  80.                                 "mysqldump",
    
  81.                                 "--defaults-file=my.cnf",
    
  82.                                 "--routines",
    
  83.                                 "--events",
    
  84.                                 "source_db",
    
  85.                             ],
    
  86.                             stdout=subprocess.PIPE,
    
  87.                             env=None,
    
  88.                         ),
    
  89.                     ]
    
  90.                 )
    
  91.         finally:
    
  92.             connection.settings_dict = saved_settings