1. import decimal
    
  2. 
    
  3. from django.core.management.color import no_style
    
  4. from django.db import NotSupportedError, connection, transaction
    
  5. from django.db.backends.base.operations import BaseDatabaseOperations
    
  6. from django.db.models import DurationField, Value
    
  7. from django.test import (
    
  8.     SimpleTestCase,
    
  9.     TestCase,
    
  10.     TransactionTestCase,
    
  11.     override_settings,
    
  12.     skipIfDBFeature,
    
  13. )
    
  14. from django.utils import timezone
    
  15. 
    
  16. from ..models import Author, Book
    
  17. 
    
  18. 
    
  19. class SimpleDatabaseOperationTests(SimpleTestCase):
    
  20.     may_require_msg = "subclasses of BaseDatabaseOperations may require a %s() method"
    
  21. 
    
  22.     def setUp(self):
    
  23.         self.ops = BaseDatabaseOperations(connection=connection)
    
  24. 
    
  25.     def test_deferrable_sql(self):
    
  26.         self.assertEqual(self.ops.deferrable_sql(), "")
    
  27. 
    
  28.     def test_end_transaction_rollback(self):
    
  29.         self.assertEqual(self.ops.end_transaction_sql(success=False), "ROLLBACK;")
    
  30. 
    
  31.     def test_no_limit_value(self):
    
  32.         with self.assertRaisesMessage(
    
  33.             NotImplementedError, self.may_require_msg % "no_limit_value"
    
  34.         ):
    
  35.             self.ops.no_limit_value()
    
  36. 
    
  37.     def test_quote_name(self):
    
  38.         with self.assertRaisesMessage(
    
  39.             NotImplementedError, self.may_require_msg % "quote_name"
    
  40.         ):
    
  41.             self.ops.quote_name("a")
    
  42. 
    
  43.     def test_regex_lookup(self):
    
  44.         with self.assertRaisesMessage(
    
  45.             NotImplementedError, self.may_require_msg % "regex_lookup"
    
  46.         ):
    
  47.             self.ops.regex_lookup(lookup_type="regex")
    
  48. 
    
  49.     def test_set_time_zone_sql(self):
    
  50.         self.assertEqual(self.ops.set_time_zone_sql(), "")
    
  51. 
    
  52.     def test_sql_flush(self):
    
  53.         msg = "subclasses of BaseDatabaseOperations must provide an sql_flush() method"
    
  54.         with self.assertRaisesMessage(NotImplementedError, msg):
    
  55.             self.ops.sql_flush(None, None)
    
  56. 
    
  57.     def test_pk_default_value(self):
    
  58.         self.assertEqual(self.ops.pk_default_value(), "DEFAULT")
    
  59. 
    
  60.     def test_tablespace_sql(self):
    
  61.         self.assertEqual(self.ops.tablespace_sql(None), "")
    
  62. 
    
  63.     def test_sequence_reset_by_name_sql(self):
    
  64.         self.assertEqual(self.ops.sequence_reset_by_name_sql(None, []), [])
    
  65. 
    
  66.     def test_adapt_unknown_value_decimal(self):
    
  67.         value = decimal.Decimal("3.14")
    
  68.         self.assertEqual(
    
  69.             self.ops.adapt_unknown_value(value),
    
  70.             self.ops.adapt_decimalfield_value(value),
    
  71.         )
    
  72. 
    
  73.     def test_adapt_unknown_value_date(self):
    
  74.         value = timezone.now().date()
    
  75.         self.assertEqual(
    
  76.             self.ops.adapt_unknown_value(value), self.ops.adapt_datefield_value(value)
    
  77.         )
    
  78. 
    
  79.     def test_adapt_unknown_value_time(self):
    
  80.         value = timezone.now().time()
    
  81.         self.assertEqual(
    
  82.             self.ops.adapt_unknown_value(value), self.ops.adapt_timefield_value(value)
    
  83.         )
    
  84. 
    
  85.     def test_adapt_timefield_value_none(self):
    
  86.         self.assertIsNone(self.ops.adapt_timefield_value(None))
    
  87. 
    
  88.     def test_adapt_timefield_value_expression(self):
    
  89.         value = Value(timezone.now().time())
    
  90.         self.assertEqual(self.ops.adapt_timefield_value(value), value)
    
  91. 
    
  92.     def test_adapt_datetimefield_value_none(self):
    
  93.         self.assertIsNone(self.ops.adapt_datetimefield_value(None))
    
  94. 
    
  95.     def test_adapt_datetimefield_value_expression(self):
    
  96.         value = Value(timezone.now())
    
  97.         self.assertEqual(self.ops.adapt_datetimefield_value(value), value)
    
  98. 
    
  99.     def test_adapt_timefield_value(self):
    
  100.         msg = "Django does not support timezone-aware times."
    
  101.         with self.assertRaisesMessage(ValueError, msg):
    
  102.             self.ops.adapt_timefield_value(timezone.make_aware(timezone.now()))
    
  103. 
    
  104.     @override_settings(USE_TZ=False)
    
  105.     def test_adapt_timefield_value_unaware(self):
    
  106.         now = timezone.now()
    
  107.         self.assertEqual(self.ops.adapt_timefield_value(now), str(now))
    
  108. 
    
  109.     def test_format_for_duration_arithmetic(self):
    
  110.         msg = self.may_require_msg % "format_for_duration_arithmetic"
    
  111.         with self.assertRaisesMessage(NotImplementedError, msg):
    
  112.             self.ops.format_for_duration_arithmetic(None)
    
  113. 
    
  114.     def test_date_extract_sql(self):
    
  115.         with self.assertRaisesMessage(
    
  116.             NotImplementedError, self.may_require_msg % "date_extract_sql"
    
  117.         ):
    
  118.             self.ops.date_extract_sql(None, None, None)
    
  119. 
    
  120.     def test_time_extract_sql(self):
    
  121.         with self.assertRaisesMessage(
    
  122.             NotImplementedError, self.may_require_msg % "date_extract_sql"
    
  123.         ):
    
  124.             self.ops.time_extract_sql(None, None, None)
    
  125. 
    
  126.     def test_date_trunc_sql(self):
    
  127.         with self.assertRaisesMessage(
    
  128.             NotImplementedError, self.may_require_msg % "date_trunc_sql"
    
  129.         ):
    
  130.             self.ops.date_trunc_sql(None, None, None)
    
  131. 
    
  132.     def test_time_trunc_sql(self):
    
  133.         with self.assertRaisesMessage(
    
  134.             NotImplementedError, self.may_require_msg % "time_trunc_sql"
    
  135.         ):
    
  136.             self.ops.time_trunc_sql(None, None, None)
    
  137. 
    
  138.     def test_datetime_trunc_sql(self):
    
  139.         with self.assertRaisesMessage(
    
  140.             NotImplementedError, self.may_require_msg % "datetime_trunc_sql"
    
  141.         ):
    
  142.             self.ops.datetime_trunc_sql(None, None, None, None)
    
  143. 
    
  144.     def test_datetime_cast_date_sql(self):
    
  145.         with self.assertRaisesMessage(
    
  146.             NotImplementedError, self.may_require_msg % "datetime_cast_date_sql"
    
  147.         ):
    
  148.             self.ops.datetime_cast_date_sql(None, None, None)
    
  149. 
    
  150.     def test_datetime_cast_time_sql(self):
    
  151.         with self.assertRaisesMessage(
    
  152.             NotImplementedError, self.may_require_msg % "datetime_cast_time_sql"
    
  153.         ):
    
  154.             self.ops.datetime_cast_time_sql(None, None, None)
    
  155. 
    
  156.     def test_datetime_extract_sql(self):
    
  157.         with self.assertRaisesMessage(
    
  158.             NotImplementedError, self.may_require_msg % "datetime_extract_sql"
    
  159.         ):
    
  160.             self.ops.datetime_extract_sql(None, None, None, None)
    
  161. 
    
  162. 
    
  163. class DatabaseOperationTests(TestCase):
    
  164.     def setUp(self):
    
  165.         self.ops = BaseDatabaseOperations(connection=connection)
    
  166. 
    
  167.     @skipIfDBFeature("supports_over_clause")
    
  168.     def test_window_frame_raise_not_supported_error(self):
    
  169.         msg = "This backend does not support window expressions."
    
  170.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  171.             self.ops.window_frame_rows_start_end()
    
  172. 
    
  173.     @skipIfDBFeature("can_distinct_on_fields")
    
  174.     def test_distinct_on_fields(self):
    
  175.         msg = "DISTINCT ON fields is not supported by this database backend"
    
  176.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  177.             self.ops.distinct_sql(["a", "b"], None)
    
  178. 
    
  179.     @skipIfDBFeature("supports_temporal_subtraction")
    
  180.     def test_subtract_temporals(self):
    
  181.         duration_field = DurationField()
    
  182.         duration_field_internal_type = duration_field.get_internal_type()
    
  183.         msg = (
    
  184.             "This backend does not support %s subtraction."
    
  185.             % duration_field_internal_type
    
  186.         )
    
  187.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  188.             self.ops.subtract_temporals(duration_field_internal_type, None, None)
    
  189. 
    
  190. 
    
  191. class SqlFlushTests(TransactionTestCase):
    
  192.     available_apps = ["backends"]
    
  193. 
    
  194.     def test_sql_flush_no_tables(self):
    
  195.         self.assertEqual(connection.ops.sql_flush(no_style(), []), [])
    
  196. 
    
  197.     def test_execute_sql_flush_statements(self):
    
  198.         with transaction.atomic():
    
  199.             author = Author.objects.create(name="George Orwell")
    
  200.             Book.objects.create(author=author)
    
  201.             author = Author.objects.create(name="Harper Lee")
    
  202.             Book.objects.create(author=author)
    
  203.             Book.objects.create(author=author)
    
  204.             self.assertIs(Author.objects.exists(), True)
    
  205.             self.assertIs(Book.objects.exists(), True)
    
  206. 
    
  207.         sql_list = connection.ops.sql_flush(
    
  208.             no_style(),
    
  209.             [Author._meta.db_table, Book._meta.db_table],
    
  210.             reset_sequences=True,
    
  211.             allow_cascade=True,
    
  212.         )
    
  213.         connection.ops.execute_sql_flush(sql_list)
    
  214. 
    
  215.         with transaction.atomic():
    
  216.             self.assertIs(Author.objects.exists(), False)
    
  217.             self.assertIs(Book.objects.exists(), False)
    
  218.             if connection.features.supports_sequence_reset:
    
  219.                 author = Author.objects.create(name="F. Scott Fitzgerald")
    
  220.                 self.assertEqual(author.pk, 1)
    
  221.                 book = Book.objects.create(author=author)
    
  222.                 self.assertEqual(book.pk, 1)