1. import datetime
    
  2. from unittest import mock
    
  3. 
    
  4. from django.contrib.postgres.indexes import OpClass
    
  5. from django.core.exceptions import ValidationError
    
  6. from django.db import IntegrityError, NotSupportedError, connection, transaction
    
  7. from django.db.models import (
    
  8.     CheckConstraint,
    
  9.     Deferrable,
    
  10.     F,
    
  11.     Func,
    
  12.     IntegerField,
    
  13.     Model,
    
  14.     Q,
    
  15.     UniqueConstraint,
    
  16. )
    
  17. from django.db.models.fields.json import KeyTextTransform
    
  18. from django.db.models.functions import Cast, Left, Lower
    
  19. from django.test import ignore_warnings, modify_settings, skipUnlessDBFeature
    
  20. from django.test.utils import isolate_apps
    
  21. from django.utils import timezone
    
  22. from django.utils.deprecation import RemovedInDjango50Warning
    
  23. 
    
  24. from . import PostgreSQLTestCase
    
  25. from .models import HotelReservation, IntegerArrayModel, RangesModel, Room, Scene
    
  26. 
    
  27. try:
    
  28.     from psycopg2.extras import DateRange, NumericRange
    
  29. 
    
  30.     from django.contrib.postgres.constraints import ExclusionConstraint
    
  31.     from django.contrib.postgres.fields import (
    
  32.         DateTimeRangeField,
    
  33.         RangeBoundary,
    
  34.         RangeOperators,
    
  35.     )
    
  36. except ImportError:
    
  37.     pass
    
  38. 
    
  39. 
    
  40. @modify_settings(INSTALLED_APPS={"append": "django.contrib.postgres"})
    
  41. class SchemaTests(PostgreSQLTestCase):
    
  42.     get_opclass_query = """
    
  43.         SELECT opcname, c.relname FROM pg_opclass AS oc
    
  44.         JOIN pg_index as i on oc.oid = ANY(i.indclass)
    
  45.         JOIN pg_class as c on c.oid = i.indexrelid
    
  46.         WHERE c.relname = %s
    
  47.     """
    
  48. 
    
  49.     def get_constraints(self, table):
    
  50.         """Get the constraints on the table using a new cursor."""
    
  51.         with connection.cursor() as cursor:
    
  52.             return connection.introspection.get_constraints(cursor, table)
    
  53. 
    
  54.     def test_check_constraint_range_value(self):
    
  55.         constraint_name = "ints_between"
    
  56.         self.assertNotIn(
    
  57.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  58.         )
    
  59.         constraint = CheckConstraint(
    
  60.             check=Q(ints__contained_by=NumericRange(10, 30)),
    
  61.             name=constraint_name,
    
  62.         )
    
  63.         with connection.schema_editor() as editor:
    
  64.             editor.add_constraint(RangesModel, constraint)
    
  65.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  66.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  67.             RangesModel.objects.create(ints=(20, 50))
    
  68.         RangesModel.objects.create(ints=(10, 30))
    
  69. 
    
  70.     def test_check_constraint_array_length(self):
    
  71.         constraint = CheckConstraint(
    
  72.             check=Q(field__len=1),
    
  73.             name="array_length",
    
  74.         )
    
  75.         msg = f"Constraint “{constraint.name}” is violated."
    
  76.         with self.assertRaisesMessage(ValidationError, msg):
    
  77.             constraint.validate(IntegerArrayModel, IntegerArrayModel())
    
  78.         constraint.validate(IntegerArrayModel, IntegerArrayModel(field=[1]))
    
  79. 
    
  80.     def test_check_constraint_daterange_contains(self):
    
  81.         constraint_name = "dates_contains"
    
  82.         self.assertNotIn(
    
  83.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  84.         )
    
  85.         constraint = CheckConstraint(
    
  86.             check=Q(dates__contains=F("dates_inner")),
    
  87.             name=constraint_name,
    
  88.         )
    
  89.         with connection.schema_editor() as editor:
    
  90.             editor.add_constraint(RangesModel, constraint)
    
  91.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  92.         date_1 = datetime.date(2016, 1, 1)
    
  93.         date_2 = datetime.date(2016, 1, 4)
    
  94.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  95.             RangesModel.objects.create(
    
  96.                 dates=(date_1, date_2),
    
  97.                 dates_inner=(date_1, date_2.replace(day=5)),
    
  98.             )
    
  99.         RangesModel.objects.create(
    
  100.             dates=(date_1, date_2),
    
  101.             dates_inner=(date_1, date_2),
    
  102.         )
    
  103. 
    
  104.     def test_check_constraint_datetimerange_contains(self):
    
  105.         constraint_name = "timestamps_contains"
    
  106.         self.assertNotIn(
    
  107.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  108.         )
    
  109.         constraint = CheckConstraint(
    
  110.             check=Q(timestamps__contains=F("timestamps_inner")),
    
  111.             name=constraint_name,
    
  112.         )
    
  113.         with connection.schema_editor() as editor:
    
  114.             editor.add_constraint(RangesModel, constraint)
    
  115.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  116.         datetime_1 = datetime.datetime(2016, 1, 1)
    
  117.         datetime_2 = datetime.datetime(2016, 1, 2, 12)
    
  118.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  119.             RangesModel.objects.create(
    
  120.                 timestamps=(datetime_1, datetime_2),
    
  121.                 timestamps_inner=(datetime_1, datetime_2.replace(hour=13)),
    
  122.             )
    
  123.         RangesModel.objects.create(
    
  124.             timestamps=(datetime_1, datetime_2),
    
  125.             timestamps_inner=(datetime_1, datetime_2),
    
  126.         )
    
  127. 
    
  128.     def test_check_constraint_range_contains(self):
    
  129.         constraint = CheckConstraint(
    
  130.             check=Q(ints__contains=(1, 5)),
    
  131.             name="ints_contains",
    
  132.         )
    
  133.         msg = f"Constraint “{constraint.name}” is violated."
    
  134.         with self.assertRaisesMessage(ValidationError, msg):
    
  135.             constraint.validate(RangesModel, RangesModel(ints=(6, 10)))
    
  136. 
    
  137.     def test_check_constraint_range_lower_upper(self):
    
  138.         constraint = CheckConstraint(
    
  139.             check=Q(ints__startswith__gte=0) & Q(ints__endswith__lte=99),
    
  140.             name="ints_range_lower_upper",
    
  141.         )
    
  142.         msg = f"Constraint “{constraint.name}” is violated."
    
  143.         with self.assertRaisesMessage(ValidationError, msg):
    
  144.             constraint.validate(RangesModel, RangesModel(ints=(-1, 20)))
    
  145.         with self.assertRaisesMessage(ValidationError, msg):
    
  146.             constraint.validate(RangesModel, RangesModel(ints=(0, 100)))
    
  147.         constraint.validate(RangesModel, RangesModel(ints=(0, 99)))
    
  148. 
    
  149.     def test_check_constraint_range_lower_with_nulls(self):
    
  150.         constraint = CheckConstraint(
    
  151.             check=Q(ints__isnull=True) | Q(ints__startswith__gte=0),
    
  152.             name="ints_optional_positive_range",
    
  153.         )
    
  154.         constraint.validate(RangesModel, RangesModel())
    
  155.         constraint = CheckConstraint(
    
  156.             check=Q(ints__startswith__gte=0),
    
  157.             name="ints_positive_range",
    
  158.         )
    
  159.         constraint.validate(RangesModel, RangesModel())
    
  160. 
    
  161.     def test_opclass(self):
    
  162.         constraint = UniqueConstraint(
    
  163.             name="test_opclass",
    
  164.             fields=["scene"],
    
  165.             opclasses=["varchar_pattern_ops"],
    
  166.         )
    
  167.         with connection.schema_editor() as editor:
    
  168.             editor.add_constraint(Scene, constraint)
    
  169.         self.assertIn(constraint.name, self.get_constraints(Scene._meta.db_table))
    
  170.         with editor.connection.cursor() as cursor:
    
  171.             cursor.execute(self.get_opclass_query, [constraint.name])
    
  172.             self.assertEqual(
    
  173.                 cursor.fetchall(),
    
  174.                 [("varchar_pattern_ops", constraint.name)],
    
  175.             )
    
  176.         # Drop the constraint.
    
  177.         with connection.schema_editor() as editor:
    
  178.             editor.remove_constraint(Scene, constraint)
    
  179.         self.assertNotIn(constraint.name, self.get_constraints(Scene._meta.db_table))
    
  180. 
    
  181.     def test_opclass_multiple_columns(self):
    
  182.         constraint = UniqueConstraint(
    
  183.             name="test_opclass_multiple",
    
  184.             fields=["scene", "setting"],
    
  185.             opclasses=["varchar_pattern_ops", "text_pattern_ops"],
    
  186.         )
    
  187.         with connection.schema_editor() as editor:
    
  188.             editor.add_constraint(Scene, constraint)
    
  189.         with editor.connection.cursor() as cursor:
    
  190.             cursor.execute(self.get_opclass_query, [constraint.name])
    
  191.             expected_opclasses = (
    
  192.                 ("varchar_pattern_ops", constraint.name),
    
  193.                 ("text_pattern_ops", constraint.name),
    
  194.             )
    
  195.             self.assertCountEqual(cursor.fetchall(), expected_opclasses)
    
  196. 
    
  197.     def test_opclass_partial(self):
    
  198.         constraint = UniqueConstraint(
    
  199.             name="test_opclass_partial",
    
  200.             fields=["scene"],
    
  201.             opclasses=["varchar_pattern_ops"],
    
  202.             condition=Q(setting__contains="Sir Bedemir's Castle"),
    
  203.         )
    
  204.         with connection.schema_editor() as editor:
    
  205.             editor.add_constraint(Scene, constraint)
    
  206.         with editor.connection.cursor() as cursor:
    
  207.             cursor.execute(self.get_opclass_query, [constraint.name])
    
  208.             self.assertCountEqual(
    
  209.                 cursor.fetchall(),
    
  210.                 [("varchar_pattern_ops", constraint.name)],
    
  211.             )
    
  212. 
    
  213.     @skipUnlessDBFeature("supports_covering_indexes")
    
  214.     def test_opclass_include(self):
    
  215.         constraint = UniqueConstraint(
    
  216.             name="test_opclass_include",
    
  217.             fields=["scene"],
    
  218.             opclasses=["varchar_pattern_ops"],
    
  219.             include=["setting"],
    
  220.         )
    
  221.         with connection.schema_editor() as editor:
    
  222.             editor.add_constraint(Scene, constraint)
    
  223.         with editor.connection.cursor() as cursor:
    
  224.             cursor.execute(self.get_opclass_query, [constraint.name])
    
  225.             self.assertCountEqual(
    
  226.                 cursor.fetchall(),
    
  227.                 [("varchar_pattern_ops", constraint.name)],
    
  228.             )
    
  229. 
    
  230.     @skipUnlessDBFeature("supports_expression_indexes")
    
  231.     def test_opclass_func(self):
    
  232.         constraint = UniqueConstraint(
    
  233.             OpClass(Lower("scene"), name="text_pattern_ops"),
    
  234.             name="test_opclass_func",
    
  235.         )
    
  236.         with connection.schema_editor() as editor:
    
  237.             editor.add_constraint(Scene, constraint)
    
  238.         constraints = self.get_constraints(Scene._meta.db_table)
    
  239.         self.assertIs(constraints[constraint.name]["unique"], True)
    
  240.         self.assertIn(constraint.name, constraints)
    
  241.         with editor.connection.cursor() as cursor:
    
  242.             cursor.execute(self.get_opclass_query, [constraint.name])
    
  243.             self.assertEqual(
    
  244.                 cursor.fetchall(),
    
  245.                 [("text_pattern_ops", constraint.name)],
    
  246.             )
    
  247.         Scene.objects.create(scene="Scene 10", setting="The dark forest of Ewing")
    
  248.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  249.             Scene.objects.create(scene="ScEnE 10", setting="Sir Bedemir's Castle")
    
  250.         Scene.objects.create(scene="Scene 5", setting="Sir Bedemir's Castle")
    
  251.         # Drop the constraint.
    
  252.         with connection.schema_editor() as editor:
    
  253.             editor.remove_constraint(Scene, constraint)
    
  254.         self.assertNotIn(constraint.name, self.get_constraints(Scene._meta.db_table))
    
  255.         Scene.objects.create(scene="ScEnE 10", setting="Sir Bedemir's Castle")
    
  256. 
    
  257. 
    
  258. @modify_settings(INSTALLED_APPS={"append": "django.contrib.postgres"})
    
  259. class ExclusionConstraintTests(PostgreSQLTestCase):
    
  260.     def get_constraints(self, table):
    
  261.         """Get the constraints on the table using a new cursor."""
    
  262.         with connection.cursor() as cursor:
    
  263.             return connection.introspection.get_constraints(cursor, table)
    
  264. 
    
  265.     def test_invalid_condition(self):
    
  266.         msg = "ExclusionConstraint.condition must be a Q instance."
    
  267.         with self.assertRaisesMessage(ValueError, msg):
    
  268.             ExclusionConstraint(
    
  269.                 index_type="GIST",
    
  270.                 name="exclude_invalid_condition",
    
  271.                 expressions=[(F("datespan"), RangeOperators.OVERLAPS)],
    
  272.                 condition=F("invalid"),
    
  273.             )
    
  274. 
    
  275.     def test_invalid_index_type(self):
    
  276.         msg = "Exclusion constraints only support GiST or SP-GiST indexes."
    
  277.         with self.assertRaisesMessage(ValueError, msg):
    
  278.             ExclusionConstraint(
    
  279.                 index_type="gin",
    
  280.                 name="exclude_invalid_index_type",
    
  281.                 expressions=[(F("datespan"), RangeOperators.OVERLAPS)],
    
  282.             )
    
  283. 
    
  284.     def test_invalid_expressions(self):
    
  285.         msg = "The expressions must be a list of 2-tuples."
    
  286.         for expressions in (["foo"], [("foo")], [("foo_1", "foo_2", "foo_3")]):
    
  287.             with self.subTest(expressions), self.assertRaisesMessage(ValueError, msg):
    
  288.                 ExclusionConstraint(
    
  289.                     index_type="GIST",
    
  290.                     name="exclude_invalid_expressions",
    
  291.                     expressions=expressions,
    
  292.                 )
    
  293. 
    
  294.     def test_empty_expressions(self):
    
  295.         msg = "At least one expression is required to define an exclusion constraint."
    
  296.         for empty_expressions in (None, []):
    
  297.             with self.subTest(empty_expressions), self.assertRaisesMessage(
    
  298.                 ValueError, msg
    
  299.             ):
    
  300.                 ExclusionConstraint(
    
  301.                     index_type="GIST",
    
  302.                     name="exclude_empty_expressions",
    
  303.                     expressions=empty_expressions,
    
  304.                 )
    
  305. 
    
  306.     def test_invalid_deferrable(self):
    
  307.         msg = "ExclusionConstraint.deferrable must be a Deferrable instance."
    
  308.         with self.assertRaisesMessage(ValueError, msg):
    
  309.             ExclusionConstraint(
    
  310.                 name="exclude_invalid_deferrable",
    
  311.                 expressions=[(F("datespan"), RangeOperators.OVERLAPS)],
    
  312.                 deferrable="invalid",
    
  313.             )
    
  314. 
    
  315.     def test_deferrable_with_condition(self):
    
  316.         msg = "ExclusionConstraint with conditions cannot be deferred."
    
  317.         with self.assertRaisesMessage(ValueError, msg):
    
  318.             ExclusionConstraint(
    
  319.                 name="exclude_invalid_condition",
    
  320.                 expressions=[(F("datespan"), RangeOperators.OVERLAPS)],
    
  321.                 condition=Q(cancelled=False),
    
  322.                 deferrable=Deferrable.DEFERRED,
    
  323.             )
    
  324. 
    
  325.     def test_invalid_include_type(self):
    
  326.         msg = "ExclusionConstraint.include must be a list or tuple."
    
  327.         with self.assertRaisesMessage(ValueError, msg):
    
  328.             ExclusionConstraint(
    
  329.                 name="exclude_invalid_include",
    
  330.                 expressions=[(F("datespan"), RangeOperators.OVERLAPS)],
    
  331.                 include="invalid",
    
  332.             )
    
  333. 
    
  334.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  335.     def test_invalid_opclasses_type(self):
    
  336.         msg = "ExclusionConstraint.opclasses must be a list or tuple."
    
  337.         with self.assertRaisesMessage(ValueError, msg):
    
  338.             ExclusionConstraint(
    
  339.                 name="exclude_invalid_opclasses",
    
  340.                 expressions=[(F("datespan"), RangeOperators.OVERLAPS)],
    
  341.                 opclasses="invalid",
    
  342.             )
    
  343. 
    
  344.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  345.     def test_opclasses_and_expressions_same_length(self):
    
  346.         msg = (
    
  347.             "ExclusionConstraint.expressions and "
    
  348.             "ExclusionConstraint.opclasses must have the same number of "
    
  349.             "elements."
    
  350.         )
    
  351.         with self.assertRaisesMessage(ValueError, msg):
    
  352.             ExclusionConstraint(
    
  353.                 name="exclude_invalid_expressions_opclasses_length",
    
  354.                 expressions=[(F("datespan"), RangeOperators.OVERLAPS)],
    
  355.                 opclasses=["foo", "bar"],
    
  356.             )
    
  357. 
    
  358.     def test_repr(self):
    
  359.         constraint = ExclusionConstraint(
    
  360.             name="exclude_overlapping",
    
  361.             expressions=[
    
  362.                 (F("datespan"), RangeOperators.OVERLAPS),
    
  363.                 (F("room"), RangeOperators.EQUAL),
    
  364.             ],
    
  365.         )
    
  366.         self.assertEqual(
    
  367.             repr(constraint),
    
  368.             "<ExclusionConstraint: index_type='GIST' expressions=["
    
  369.             "(F(datespan), '&&'), (F(room), '=')] name='exclude_overlapping'>",
    
  370.         )
    
  371.         constraint = ExclusionConstraint(
    
  372.             name="exclude_overlapping",
    
  373.             expressions=[(F("datespan"), RangeOperators.ADJACENT_TO)],
    
  374.             condition=Q(cancelled=False),
    
  375.             index_type="SPGiST",
    
  376.         )
    
  377.         self.assertEqual(
    
  378.             repr(constraint),
    
  379.             "<ExclusionConstraint: index_type='SPGiST' expressions=["
    
  380.             "(F(datespan), '-|-')] name='exclude_overlapping' "
    
  381.             "condition=(AND: ('cancelled', False))>",
    
  382.         )
    
  383.         constraint = ExclusionConstraint(
    
  384.             name="exclude_overlapping",
    
  385.             expressions=[(F("datespan"), RangeOperators.ADJACENT_TO)],
    
  386.             deferrable=Deferrable.IMMEDIATE,
    
  387.         )
    
  388.         self.assertEqual(
    
  389.             repr(constraint),
    
  390.             "<ExclusionConstraint: index_type='GIST' expressions=["
    
  391.             "(F(datespan), '-|-')] name='exclude_overlapping' "
    
  392.             "deferrable=Deferrable.IMMEDIATE>",
    
  393.         )
    
  394.         constraint = ExclusionConstraint(
    
  395.             name="exclude_overlapping",
    
  396.             expressions=[(F("datespan"), RangeOperators.ADJACENT_TO)],
    
  397.             include=["cancelled", "room"],
    
  398.         )
    
  399.         self.assertEqual(
    
  400.             repr(constraint),
    
  401.             "<ExclusionConstraint: index_type='GIST' expressions=["
    
  402.             "(F(datespan), '-|-')] name='exclude_overlapping' "
    
  403.             "include=('cancelled', 'room')>",
    
  404.         )
    
  405.         constraint = ExclusionConstraint(
    
  406.             name="exclude_overlapping",
    
  407.             expressions=[
    
  408.                 (OpClass("datespan", name="range_ops"), RangeOperators.ADJACENT_TO),
    
  409.             ],
    
  410.         )
    
  411.         self.assertEqual(
    
  412.             repr(constraint),
    
  413.             "<ExclusionConstraint: index_type='GIST' expressions=["
    
  414.             "(OpClass(F(datespan), name=range_ops), '-|-')] "
    
  415.             "name='exclude_overlapping'>",
    
  416.         )
    
  417. 
    
  418.     def test_eq(self):
    
  419.         constraint_1 = ExclusionConstraint(
    
  420.             name="exclude_overlapping",
    
  421.             expressions=[
    
  422.                 (F("datespan"), RangeOperators.OVERLAPS),
    
  423.                 (F("room"), RangeOperators.EQUAL),
    
  424.             ],
    
  425.             condition=Q(cancelled=False),
    
  426.         )
    
  427.         constraint_2 = ExclusionConstraint(
    
  428.             name="exclude_overlapping",
    
  429.             expressions=[
    
  430.                 ("datespan", RangeOperators.OVERLAPS),
    
  431.                 ("room", RangeOperators.EQUAL),
    
  432.             ],
    
  433.         )
    
  434.         constraint_3 = ExclusionConstraint(
    
  435.             name="exclude_overlapping",
    
  436.             expressions=[("datespan", RangeOperators.OVERLAPS)],
    
  437.             condition=Q(cancelled=False),
    
  438.         )
    
  439.         constraint_4 = ExclusionConstraint(
    
  440.             name="exclude_overlapping",
    
  441.             expressions=[
    
  442.                 ("datespan", RangeOperators.OVERLAPS),
    
  443.                 ("room", RangeOperators.EQUAL),
    
  444.             ],
    
  445.             deferrable=Deferrable.DEFERRED,
    
  446.         )
    
  447.         constraint_5 = ExclusionConstraint(
    
  448.             name="exclude_overlapping",
    
  449.             expressions=[
    
  450.                 ("datespan", RangeOperators.OVERLAPS),
    
  451.                 ("room", RangeOperators.EQUAL),
    
  452.             ],
    
  453.             deferrable=Deferrable.IMMEDIATE,
    
  454.         )
    
  455.         constraint_6 = ExclusionConstraint(
    
  456.             name="exclude_overlapping",
    
  457.             expressions=[
    
  458.                 ("datespan", RangeOperators.OVERLAPS),
    
  459.                 ("room", RangeOperators.EQUAL),
    
  460.             ],
    
  461.             deferrable=Deferrable.IMMEDIATE,
    
  462.             include=["cancelled"],
    
  463.         )
    
  464.         constraint_7 = ExclusionConstraint(
    
  465.             name="exclude_overlapping",
    
  466.             expressions=[
    
  467.                 ("datespan", RangeOperators.OVERLAPS),
    
  468.                 ("room", RangeOperators.EQUAL),
    
  469.             ],
    
  470.             include=["cancelled"],
    
  471.         )
    
  472.         with ignore_warnings(category=RemovedInDjango50Warning):
    
  473.             constraint_8 = ExclusionConstraint(
    
  474.                 name="exclude_overlapping",
    
  475.                 expressions=[
    
  476.                     ("datespan", RangeOperators.OVERLAPS),
    
  477.                     ("room", RangeOperators.EQUAL),
    
  478.                 ],
    
  479.                 include=["cancelled"],
    
  480.                 opclasses=["range_ops", "range_ops"],
    
  481.             )
    
  482.             constraint_9 = ExclusionConstraint(
    
  483.                 name="exclude_overlapping",
    
  484.                 expressions=[
    
  485.                     ("datespan", RangeOperators.OVERLAPS),
    
  486.                     ("room", RangeOperators.EQUAL),
    
  487.                 ],
    
  488.                 opclasses=["range_ops", "range_ops"],
    
  489.             )
    
  490.             self.assertNotEqual(constraint_2, constraint_9)
    
  491.             self.assertNotEqual(constraint_7, constraint_8)
    
  492. 
    
  493.         constraint_10 = ExclusionConstraint(
    
  494.             name="exclude_overlapping",
    
  495.             expressions=[
    
  496.                 (F("datespan"), RangeOperators.OVERLAPS),
    
  497.                 (F("room"), RangeOperators.EQUAL),
    
  498.             ],
    
  499.             condition=Q(cancelled=False),
    
  500.             violation_error_message="custom error",
    
  501.         )
    
  502.         constraint_11 = ExclusionConstraint(
    
  503.             name="exclude_overlapping",
    
  504.             expressions=[
    
  505.                 (F("datespan"), RangeOperators.OVERLAPS),
    
  506.                 (F("room"), RangeOperators.EQUAL),
    
  507.             ],
    
  508.             condition=Q(cancelled=False),
    
  509.             violation_error_message="other custom error",
    
  510.         )
    
  511.         self.assertEqual(constraint_1, constraint_1)
    
  512.         self.assertEqual(constraint_1, mock.ANY)
    
  513.         self.assertNotEqual(constraint_1, constraint_2)
    
  514.         self.assertNotEqual(constraint_1, constraint_3)
    
  515.         self.assertNotEqual(constraint_1, constraint_4)
    
  516.         self.assertNotEqual(constraint_1, constraint_10)
    
  517.         self.assertNotEqual(constraint_2, constraint_3)
    
  518.         self.assertNotEqual(constraint_2, constraint_4)
    
  519.         self.assertNotEqual(constraint_2, constraint_7)
    
  520.         self.assertNotEqual(constraint_4, constraint_5)
    
  521.         self.assertNotEqual(constraint_5, constraint_6)
    
  522.         self.assertNotEqual(constraint_1, object())
    
  523.         self.assertNotEqual(constraint_10, constraint_11)
    
  524.         self.assertEqual(constraint_10, constraint_10)
    
  525. 
    
  526.     def test_deconstruct(self):
    
  527.         constraint = ExclusionConstraint(
    
  528.             name="exclude_overlapping",
    
  529.             expressions=[
    
  530.                 ("datespan", RangeOperators.OVERLAPS),
    
  531.                 ("room", RangeOperators.EQUAL),
    
  532.             ],
    
  533.         )
    
  534.         path, args, kwargs = constraint.deconstruct()
    
  535.         self.assertEqual(
    
  536.             path, "django.contrib.postgres.constraints.ExclusionConstraint"
    
  537.         )
    
  538.         self.assertEqual(args, ())
    
  539.         self.assertEqual(
    
  540.             kwargs,
    
  541.             {
    
  542.                 "name": "exclude_overlapping",
    
  543.                 "expressions": [
    
  544.                     ("datespan", RangeOperators.OVERLAPS),
    
  545.                     ("room", RangeOperators.EQUAL),
    
  546.                 ],
    
  547.             },
    
  548.         )
    
  549. 
    
  550.     def test_deconstruct_index_type(self):
    
  551.         constraint = ExclusionConstraint(
    
  552.             name="exclude_overlapping",
    
  553.             index_type="SPGIST",
    
  554.             expressions=[
    
  555.                 ("datespan", RangeOperators.OVERLAPS),
    
  556.                 ("room", RangeOperators.EQUAL),
    
  557.             ],
    
  558.         )
    
  559.         path, args, kwargs = constraint.deconstruct()
    
  560.         self.assertEqual(
    
  561.             path, "django.contrib.postgres.constraints.ExclusionConstraint"
    
  562.         )
    
  563.         self.assertEqual(args, ())
    
  564.         self.assertEqual(
    
  565.             kwargs,
    
  566.             {
    
  567.                 "name": "exclude_overlapping",
    
  568.                 "index_type": "SPGIST",
    
  569.                 "expressions": [
    
  570.                     ("datespan", RangeOperators.OVERLAPS),
    
  571.                     ("room", RangeOperators.EQUAL),
    
  572.                 ],
    
  573.             },
    
  574.         )
    
  575. 
    
  576.     def test_deconstruct_condition(self):
    
  577.         constraint = ExclusionConstraint(
    
  578.             name="exclude_overlapping",
    
  579.             expressions=[
    
  580.                 ("datespan", RangeOperators.OVERLAPS),
    
  581.                 ("room", RangeOperators.EQUAL),
    
  582.             ],
    
  583.             condition=Q(cancelled=False),
    
  584.         )
    
  585.         path, args, kwargs = constraint.deconstruct()
    
  586.         self.assertEqual(
    
  587.             path, "django.contrib.postgres.constraints.ExclusionConstraint"
    
  588.         )
    
  589.         self.assertEqual(args, ())
    
  590.         self.assertEqual(
    
  591.             kwargs,
    
  592.             {
    
  593.                 "name": "exclude_overlapping",
    
  594.                 "expressions": [
    
  595.                     ("datespan", RangeOperators.OVERLAPS),
    
  596.                     ("room", RangeOperators.EQUAL),
    
  597.                 ],
    
  598.                 "condition": Q(cancelled=False),
    
  599.             },
    
  600.         )
    
  601. 
    
  602.     def test_deconstruct_deferrable(self):
    
  603.         constraint = ExclusionConstraint(
    
  604.             name="exclude_overlapping",
    
  605.             expressions=[("datespan", RangeOperators.OVERLAPS)],
    
  606.             deferrable=Deferrable.DEFERRED,
    
  607.         )
    
  608.         path, args, kwargs = constraint.deconstruct()
    
  609.         self.assertEqual(
    
  610.             path, "django.contrib.postgres.constraints.ExclusionConstraint"
    
  611.         )
    
  612.         self.assertEqual(args, ())
    
  613.         self.assertEqual(
    
  614.             kwargs,
    
  615.             {
    
  616.                 "name": "exclude_overlapping",
    
  617.                 "expressions": [("datespan", RangeOperators.OVERLAPS)],
    
  618.                 "deferrable": Deferrable.DEFERRED,
    
  619.             },
    
  620.         )
    
  621. 
    
  622.     def test_deconstruct_include(self):
    
  623.         constraint = ExclusionConstraint(
    
  624.             name="exclude_overlapping",
    
  625.             expressions=[("datespan", RangeOperators.OVERLAPS)],
    
  626.             include=["cancelled", "room"],
    
  627.         )
    
  628.         path, args, kwargs = constraint.deconstruct()
    
  629.         self.assertEqual(
    
  630.             path, "django.contrib.postgres.constraints.ExclusionConstraint"
    
  631.         )
    
  632.         self.assertEqual(args, ())
    
  633.         self.assertEqual(
    
  634.             kwargs,
    
  635.             {
    
  636.                 "name": "exclude_overlapping",
    
  637.                 "expressions": [("datespan", RangeOperators.OVERLAPS)],
    
  638.                 "include": ("cancelled", "room"),
    
  639.             },
    
  640.         )
    
  641. 
    
  642.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  643.     def test_deconstruct_opclasses(self):
    
  644.         constraint = ExclusionConstraint(
    
  645.             name="exclude_overlapping",
    
  646.             expressions=[("datespan", RangeOperators.OVERLAPS)],
    
  647.             opclasses=["range_ops"],
    
  648.         )
    
  649.         path, args, kwargs = constraint.deconstruct()
    
  650.         self.assertEqual(
    
  651.             path, "django.contrib.postgres.constraints.ExclusionConstraint"
    
  652.         )
    
  653.         self.assertEqual(args, ())
    
  654.         self.assertEqual(
    
  655.             kwargs,
    
  656.             {
    
  657.                 "name": "exclude_overlapping",
    
  658.                 "expressions": [("datespan", RangeOperators.OVERLAPS)],
    
  659.                 "opclasses": ["range_ops"],
    
  660.             },
    
  661.         )
    
  662. 
    
  663.     def _test_range_overlaps(self, constraint):
    
  664.         # Create exclusion constraint.
    
  665.         self.assertNotIn(
    
  666.             constraint.name, self.get_constraints(HotelReservation._meta.db_table)
    
  667.         )
    
  668.         with connection.schema_editor() as editor:
    
  669.             editor.add_constraint(HotelReservation, constraint)
    
  670.         self.assertIn(
    
  671.             constraint.name, self.get_constraints(HotelReservation._meta.db_table)
    
  672.         )
    
  673.         # Add initial reservations.
    
  674.         room101 = Room.objects.create(number=101)
    
  675.         room102 = Room.objects.create(number=102)
    
  676.         datetimes = [
    
  677.             timezone.datetime(2018, 6, 20),
    
  678.             timezone.datetime(2018, 6, 24),
    
  679.             timezone.datetime(2018, 6, 26),
    
  680.             timezone.datetime(2018, 6, 28),
    
  681.             timezone.datetime(2018, 6, 29),
    
  682.         ]
    
  683.         reservation = HotelReservation.objects.create(
    
  684.             datespan=DateRange(datetimes[0].date(), datetimes[1].date()),
    
  685.             start=datetimes[0],
    
  686.             end=datetimes[1],
    
  687.             room=room102,
    
  688.         )
    
  689.         constraint.validate(HotelReservation, reservation)
    
  690.         HotelReservation.objects.create(
    
  691.             datespan=DateRange(datetimes[1].date(), datetimes[3].date()),
    
  692.             start=datetimes[1],
    
  693.             end=datetimes[3],
    
  694.             room=room102,
    
  695.         )
    
  696.         HotelReservation.objects.create(
    
  697.             datespan=DateRange(datetimes[3].date(), datetimes[4].date()),
    
  698.             start=datetimes[3],
    
  699.             end=datetimes[4],
    
  700.             room=room102,
    
  701.             cancelled=True,
    
  702.         )
    
  703.         # Overlap dates.
    
  704.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  705.             reservation = HotelReservation(
    
  706.                 datespan=(datetimes[1].date(), datetimes[2].date()),
    
  707.                 start=datetimes[1],
    
  708.                 end=datetimes[2],
    
  709.                 room=room102,
    
  710.             )
    
  711.             msg = f"Constraint “{constraint.name}” is violated."
    
  712.             with self.assertRaisesMessage(ValidationError, msg):
    
  713.                 constraint.validate(HotelReservation, reservation)
    
  714.             reservation.save()
    
  715.         # Valid range.
    
  716.         other_valid_reservations = [
    
  717.             # Other room.
    
  718.             HotelReservation(
    
  719.                 datespan=(datetimes[1].date(), datetimes[2].date()),
    
  720.                 start=datetimes[1],
    
  721.                 end=datetimes[2],
    
  722.                 room=room101,
    
  723.             ),
    
  724.             # Cancelled reservation.
    
  725.             HotelReservation(
    
  726.                 datespan=(datetimes[1].date(), datetimes[1].date()),
    
  727.                 start=datetimes[1],
    
  728.                 end=datetimes[2],
    
  729.                 room=room102,
    
  730.                 cancelled=True,
    
  731.             ),
    
  732.             # Other adjacent dates.
    
  733.             HotelReservation(
    
  734.                 datespan=(datetimes[3].date(), datetimes[4].date()),
    
  735.                 start=datetimes[3],
    
  736.                 end=datetimes[4],
    
  737.                 room=room102,
    
  738.             ),
    
  739.         ]
    
  740.         for reservation in other_valid_reservations:
    
  741.             constraint.validate(HotelReservation, reservation)
    
  742.         HotelReservation.objects.bulk_create(other_valid_reservations)
    
  743.         # Excluded fields.
    
  744.         constraint.validate(
    
  745.             HotelReservation,
    
  746.             HotelReservation(
    
  747.                 datespan=(datetimes[1].date(), datetimes[2].date()),
    
  748.                 start=datetimes[1],
    
  749.                 end=datetimes[2],
    
  750.                 room=room102,
    
  751.             ),
    
  752.             exclude={"room"},
    
  753.         )
    
  754.         constraint.validate(
    
  755.             HotelReservation,
    
  756.             HotelReservation(
    
  757.                 datespan=(datetimes[1].date(), datetimes[2].date()),
    
  758.                 start=datetimes[1],
    
  759.                 end=datetimes[2],
    
  760.                 room=room102,
    
  761.             ),
    
  762.             exclude={"datespan", "start", "end", "room"},
    
  763.         )
    
  764. 
    
  765.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  766.     def test_range_overlaps_custom_opclasses(self):
    
  767.         class TsTzRange(Func):
    
  768.             function = "TSTZRANGE"
    
  769.             output_field = DateTimeRangeField()
    
  770. 
    
  771.         constraint = ExclusionConstraint(
    
  772.             name="exclude_overlapping_reservations_custom",
    
  773.             expressions=[
    
  774.                 (TsTzRange("start", "end", RangeBoundary()), RangeOperators.OVERLAPS),
    
  775.                 ("room", RangeOperators.EQUAL),
    
  776.             ],
    
  777.             condition=Q(cancelled=False),
    
  778.             opclasses=["range_ops", "gist_int4_ops"],
    
  779.         )
    
  780.         self._test_range_overlaps(constraint)
    
  781. 
    
  782.     def test_range_overlaps_custom(self):
    
  783.         class TsTzRange(Func):
    
  784.             function = "TSTZRANGE"
    
  785.             output_field = DateTimeRangeField()
    
  786. 
    
  787.         constraint = ExclusionConstraint(
    
  788.             name="exclude_overlapping_reservations_custom_opclass",
    
  789.             expressions=[
    
  790.                 (
    
  791.                     OpClass(TsTzRange("start", "end", RangeBoundary()), "range_ops"),
    
  792.                     RangeOperators.OVERLAPS,
    
  793.                 ),
    
  794.                 (OpClass("room", "gist_int4_ops"), RangeOperators.EQUAL),
    
  795.             ],
    
  796.             condition=Q(cancelled=False),
    
  797.         )
    
  798.         self._test_range_overlaps(constraint)
    
  799. 
    
  800.     def test_range_overlaps(self):
    
  801.         constraint = ExclusionConstraint(
    
  802.             name="exclude_overlapping_reservations",
    
  803.             expressions=[
    
  804.                 (F("datespan"), RangeOperators.OVERLAPS),
    
  805.                 ("room", RangeOperators.EQUAL),
    
  806.             ],
    
  807.             condition=Q(cancelled=False),
    
  808.         )
    
  809.         self._test_range_overlaps(constraint)
    
  810. 
    
  811.     def test_range_adjacent(self):
    
  812.         constraint_name = "ints_adjacent"
    
  813.         self.assertNotIn(
    
  814.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  815.         )
    
  816.         constraint = ExclusionConstraint(
    
  817.             name=constraint_name,
    
  818.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  819.         )
    
  820.         with connection.schema_editor() as editor:
    
  821.             editor.add_constraint(RangesModel, constraint)
    
  822.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  823.         RangesModel.objects.create(ints=(20, 50))
    
  824.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  825.             RangesModel.objects.create(ints=(10, 20))
    
  826.         RangesModel.objects.create(ints=(10, 19))
    
  827.         RangesModel.objects.create(ints=(51, 60))
    
  828.         # Drop the constraint.
    
  829.         with connection.schema_editor() as editor:
    
  830.             editor.remove_constraint(RangesModel, constraint)
    
  831.         self.assertNotIn(
    
  832.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  833.         )
    
  834. 
    
  835.     def test_validate_range_adjacent(self):
    
  836.         constraint = ExclusionConstraint(
    
  837.             name="ints_adjacent",
    
  838.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  839.             violation_error_message="Custom error message.",
    
  840.         )
    
  841.         range_obj = RangesModel.objects.create(ints=(20, 50))
    
  842.         constraint.validate(RangesModel, range_obj)
    
  843.         msg = "Custom error message."
    
  844.         with self.assertRaisesMessage(ValidationError, msg):
    
  845.             constraint.validate(RangesModel, RangesModel(ints=(10, 20)))
    
  846.         constraint.validate(RangesModel, RangesModel(ints=(10, 19)))
    
  847.         constraint.validate(RangesModel, RangesModel(ints=(51, 60)))
    
  848.         constraint.validate(RangesModel, RangesModel(ints=(10, 20)), exclude={"ints"})
    
  849. 
    
  850.     def test_expressions_with_params(self):
    
  851.         constraint_name = "scene_left_equal"
    
  852.         self.assertNotIn(constraint_name, self.get_constraints(Scene._meta.db_table))
    
  853.         constraint = ExclusionConstraint(
    
  854.             name=constraint_name,
    
  855.             expressions=[(Left("scene", 4), RangeOperators.EQUAL)],
    
  856.         )
    
  857.         with connection.schema_editor() as editor:
    
  858.             editor.add_constraint(Scene, constraint)
    
  859.         self.assertIn(constraint_name, self.get_constraints(Scene._meta.db_table))
    
  860. 
    
  861.     def test_expressions_with_key_transform(self):
    
  862.         constraint_name = "exclude_overlapping_reservations_smoking"
    
  863.         constraint = ExclusionConstraint(
    
  864.             name=constraint_name,
    
  865.             expressions=[
    
  866.                 (F("datespan"), RangeOperators.OVERLAPS),
    
  867.                 (KeyTextTransform("smoking", "requirements"), RangeOperators.EQUAL),
    
  868.             ],
    
  869.         )
    
  870.         with connection.schema_editor() as editor:
    
  871.             editor.add_constraint(HotelReservation, constraint)
    
  872.         self.assertIn(
    
  873.             constraint_name,
    
  874.             self.get_constraints(HotelReservation._meta.db_table),
    
  875.         )
    
  876. 
    
  877.     def test_index_transform(self):
    
  878.         constraint_name = "first_index_equal"
    
  879.         constraint = ExclusionConstraint(
    
  880.             name=constraint_name,
    
  881.             expressions=[("field__0", RangeOperators.EQUAL)],
    
  882.         )
    
  883.         with connection.schema_editor() as editor:
    
  884.             editor.add_constraint(IntegerArrayModel, constraint)
    
  885.         self.assertIn(
    
  886.             constraint_name,
    
  887.             self.get_constraints(IntegerArrayModel._meta.db_table),
    
  888.         )
    
  889. 
    
  890.     def test_range_adjacent_initially_deferred(self):
    
  891.         constraint_name = "ints_adjacent_deferred"
    
  892.         self.assertNotIn(
    
  893.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  894.         )
    
  895.         constraint = ExclusionConstraint(
    
  896.             name=constraint_name,
    
  897.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  898.             deferrable=Deferrable.DEFERRED,
    
  899.         )
    
  900.         with connection.schema_editor() as editor:
    
  901.             editor.add_constraint(RangesModel, constraint)
    
  902.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  903.         RangesModel.objects.create(ints=(20, 50))
    
  904.         adjacent_range = RangesModel.objects.create(ints=(10, 20))
    
  905.         # Constraint behavior can be changed with SET CONSTRAINTS.
    
  906.         with self.assertRaises(IntegrityError):
    
  907.             with transaction.atomic(), connection.cursor() as cursor:
    
  908.                 quoted_name = connection.ops.quote_name(constraint_name)
    
  909.                 cursor.execute("SET CONSTRAINTS %s IMMEDIATE" % quoted_name)
    
  910.         # Remove adjacent range before the end of transaction.
    
  911.         adjacent_range.delete()
    
  912.         RangesModel.objects.create(ints=(10, 19))
    
  913.         RangesModel.objects.create(ints=(51, 60))
    
  914. 
    
  915.     @skipUnlessDBFeature("supports_covering_gist_indexes")
    
  916.     def test_range_adjacent_gist_include(self):
    
  917.         constraint_name = "ints_adjacent_gist_include"
    
  918.         self.assertNotIn(
    
  919.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  920.         )
    
  921.         constraint = ExclusionConstraint(
    
  922.             name=constraint_name,
    
  923.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  924.             index_type="gist",
    
  925.             include=["decimals", "ints"],
    
  926.         )
    
  927.         with connection.schema_editor() as editor:
    
  928.             editor.add_constraint(RangesModel, constraint)
    
  929.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  930.         RangesModel.objects.create(ints=(20, 50))
    
  931.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  932.             RangesModel.objects.create(ints=(10, 20))
    
  933.         RangesModel.objects.create(ints=(10, 19))
    
  934.         RangesModel.objects.create(ints=(51, 60))
    
  935. 
    
  936.     @skipUnlessDBFeature("supports_covering_spgist_indexes")
    
  937.     def test_range_adjacent_spgist_include(self):
    
  938.         constraint_name = "ints_adjacent_spgist_include"
    
  939.         self.assertNotIn(
    
  940.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  941.         )
    
  942.         constraint = ExclusionConstraint(
    
  943.             name=constraint_name,
    
  944.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  945.             index_type="spgist",
    
  946.             include=["decimals", "ints"],
    
  947.         )
    
  948.         with connection.schema_editor() as editor:
    
  949.             editor.add_constraint(RangesModel, constraint)
    
  950.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  951.         RangesModel.objects.create(ints=(20, 50))
    
  952.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  953.             RangesModel.objects.create(ints=(10, 20))
    
  954.         RangesModel.objects.create(ints=(10, 19))
    
  955.         RangesModel.objects.create(ints=(51, 60))
    
  956. 
    
  957.     @skipUnlessDBFeature("supports_covering_gist_indexes")
    
  958.     def test_range_adjacent_gist_include_condition(self):
    
  959.         constraint_name = "ints_adjacent_gist_include_condition"
    
  960.         self.assertNotIn(
    
  961.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  962.         )
    
  963.         constraint = ExclusionConstraint(
    
  964.             name=constraint_name,
    
  965.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  966.             index_type="gist",
    
  967.             include=["decimals"],
    
  968.             condition=Q(id__gte=100),
    
  969.         )
    
  970.         with connection.schema_editor() as editor:
    
  971.             editor.add_constraint(RangesModel, constraint)
    
  972.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  973. 
    
  974.     @skipUnlessDBFeature("supports_covering_spgist_indexes")
    
  975.     def test_range_adjacent_spgist_include_condition(self):
    
  976.         constraint_name = "ints_adjacent_spgist_include_condition"
    
  977.         self.assertNotIn(
    
  978.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  979.         )
    
  980.         constraint = ExclusionConstraint(
    
  981.             name=constraint_name,
    
  982.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  983.             index_type="spgist",
    
  984.             include=["decimals"],
    
  985.             condition=Q(id__gte=100),
    
  986.         )
    
  987.         with connection.schema_editor() as editor:
    
  988.             editor.add_constraint(RangesModel, constraint)
    
  989.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  990. 
    
  991.     @skipUnlessDBFeature("supports_covering_gist_indexes")
    
  992.     def test_range_adjacent_gist_include_deferrable(self):
    
  993.         constraint_name = "ints_adjacent_gist_include_deferrable"
    
  994.         self.assertNotIn(
    
  995.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  996.         )
    
  997.         constraint = ExclusionConstraint(
    
  998.             name=constraint_name,
    
  999.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1000.             index_type="gist",
    
  1001.             include=["decimals"],
    
  1002.             deferrable=Deferrable.DEFERRED,
    
  1003.         )
    
  1004.         with connection.schema_editor() as editor:
    
  1005.             editor.add_constraint(RangesModel, constraint)
    
  1006.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1007. 
    
  1008.     @skipUnlessDBFeature("supports_covering_spgist_indexes")
    
  1009.     def test_range_adjacent_spgist_include_deferrable(self):
    
  1010.         constraint_name = "ints_adjacent_spgist_include_deferrable"
    
  1011.         self.assertNotIn(
    
  1012.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1013.         )
    
  1014.         constraint = ExclusionConstraint(
    
  1015.             name=constraint_name,
    
  1016.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1017.             index_type="spgist",
    
  1018.             include=["decimals"],
    
  1019.             deferrable=Deferrable.DEFERRED,
    
  1020.         )
    
  1021.         with connection.schema_editor() as editor:
    
  1022.             editor.add_constraint(RangesModel, constraint)
    
  1023.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1024. 
    
  1025.     def test_gist_include_not_supported(self):
    
  1026.         constraint_name = "ints_adjacent_gist_include_not_supported"
    
  1027.         constraint = ExclusionConstraint(
    
  1028.             name=constraint_name,
    
  1029.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1030.             index_type="gist",
    
  1031.             include=["id"],
    
  1032.         )
    
  1033.         msg = (
    
  1034.             "Covering exclusion constraints using a GiST index require "
    
  1035.             "PostgreSQL 12+."
    
  1036.         )
    
  1037.         with connection.schema_editor() as editor:
    
  1038.             with mock.patch(
    
  1039.                 "django.db.backends.postgresql.features.DatabaseFeatures."
    
  1040.                 "supports_covering_gist_indexes",
    
  1041.                 False,
    
  1042.             ):
    
  1043.                 with self.assertRaisesMessage(NotSupportedError, msg):
    
  1044.                     editor.add_constraint(RangesModel, constraint)
    
  1045. 
    
  1046.     def test_spgist_include_not_supported(self):
    
  1047.         constraint_name = "ints_adjacent_spgist_include_not_supported"
    
  1048.         constraint = ExclusionConstraint(
    
  1049.             name=constraint_name,
    
  1050.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1051.             index_type="spgist",
    
  1052.             include=["id"],
    
  1053.         )
    
  1054.         msg = (
    
  1055.             "Covering exclusion constraints using an SP-GiST index require "
    
  1056.             "PostgreSQL 14+."
    
  1057.         )
    
  1058.         with connection.schema_editor() as editor:
    
  1059.             with mock.patch(
    
  1060.                 "django.db.backends.postgresql.features.DatabaseFeatures."
    
  1061.                 "supports_covering_spgist_indexes",
    
  1062.                 False,
    
  1063.             ):
    
  1064.                 with self.assertRaisesMessage(NotSupportedError, msg):
    
  1065.                     editor.add_constraint(RangesModel, constraint)
    
  1066. 
    
  1067.     def test_range_adjacent_opclass(self):
    
  1068.         constraint_name = "ints_adjacent_opclass"
    
  1069.         self.assertNotIn(
    
  1070.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1071.         )
    
  1072.         constraint = ExclusionConstraint(
    
  1073.             name=constraint_name,
    
  1074.             expressions=[
    
  1075.                 (OpClass("ints", name="range_ops"), RangeOperators.ADJACENT_TO),
    
  1076.             ],
    
  1077.         )
    
  1078.         with connection.schema_editor() as editor:
    
  1079.             editor.add_constraint(RangesModel, constraint)
    
  1080.         constraints = self.get_constraints(RangesModel._meta.db_table)
    
  1081.         self.assertIn(constraint_name, constraints)
    
  1082.         with editor.connection.cursor() as cursor:
    
  1083.             cursor.execute(SchemaTests.get_opclass_query, [constraint_name])
    
  1084.             self.assertEqual(
    
  1085.                 cursor.fetchall(),
    
  1086.                 [("range_ops", constraint_name)],
    
  1087.             )
    
  1088.         RangesModel.objects.create(ints=(20, 50))
    
  1089.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  1090.             RangesModel.objects.create(ints=(10, 20))
    
  1091.         RangesModel.objects.create(ints=(10, 19))
    
  1092.         RangesModel.objects.create(ints=(51, 60))
    
  1093.         # Drop the constraint.
    
  1094.         with connection.schema_editor() as editor:
    
  1095.             editor.remove_constraint(RangesModel, constraint)
    
  1096.         self.assertNotIn(
    
  1097.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1098.         )
    
  1099. 
    
  1100.     def test_range_adjacent_opclass_condition(self):
    
  1101.         constraint_name = "ints_adjacent_opclass_condition"
    
  1102.         self.assertNotIn(
    
  1103.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1104.         )
    
  1105.         constraint = ExclusionConstraint(
    
  1106.             name=constraint_name,
    
  1107.             expressions=[
    
  1108.                 (OpClass("ints", name="range_ops"), RangeOperators.ADJACENT_TO),
    
  1109.             ],
    
  1110.             condition=Q(id__gte=100),
    
  1111.         )
    
  1112.         with connection.schema_editor() as editor:
    
  1113.             editor.add_constraint(RangesModel, constraint)
    
  1114.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1115. 
    
  1116.     def test_range_adjacent_opclass_deferrable(self):
    
  1117.         constraint_name = "ints_adjacent_opclass_deferrable"
    
  1118.         self.assertNotIn(
    
  1119.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1120.         )
    
  1121.         constraint = ExclusionConstraint(
    
  1122.             name=constraint_name,
    
  1123.             expressions=[
    
  1124.                 (OpClass("ints", name="range_ops"), RangeOperators.ADJACENT_TO),
    
  1125.             ],
    
  1126.             deferrable=Deferrable.DEFERRED,
    
  1127.         )
    
  1128.         with connection.schema_editor() as editor:
    
  1129.             editor.add_constraint(RangesModel, constraint)
    
  1130.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1131. 
    
  1132.     @skipUnlessDBFeature("supports_covering_gist_indexes")
    
  1133.     def test_range_adjacent_gist_opclass_include(self):
    
  1134.         constraint_name = "ints_adjacent_gist_opclass_include"
    
  1135.         self.assertNotIn(
    
  1136.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1137.         )
    
  1138.         constraint = ExclusionConstraint(
    
  1139.             name=constraint_name,
    
  1140.             expressions=[
    
  1141.                 (OpClass("ints", name="range_ops"), RangeOperators.ADJACENT_TO),
    
  1142.             ],
    
  1143.             index_type="gist",
    
  1144.             include=["decimals"],
    
  1145.         )
    
  1146.         with connection.schema_editor() as editor:
    
  1147.             editor.add_constraint(RangesModel, constraint)
    
  1148.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1149. 
    
  1150.     @skipUnlessDBFeature("supports_covering_spgist_indexes")
    
  1151.     def test_range_adjacent_spgist_opclass_include(self):
    
  1152.         constraint_name = "ints_adjacent_spgist_opclass_include"
    
  1153.         self.assertNotIn(
    
  1154.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1155.         )
    
  1156.         constraint = ExclusionConstraint(
    
  1157.             name=constraint_name,
    
  1158.             expressions=[
    
  1159.                 (OpClass("ints", name="range_ops"), RangeOperators.ADJACENT_TO),
    
  1160.             ],
    
  1161.             index_type="spgist",
    
  1162.             include=["decimals"],
    
  1163.         )
    
  1164.         with connection.schema_editor() as editor:
    
  1165.             editor.add_constraint(RangesModel, constraint)
    
  1166.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1167. 
    
  1168.     def test_range_equal_cast(self):
    
  1169.         constraint_name = "exclusion_equal_room_cast"
    
  1170.         self.assertNotIn(constraint_name, self.get_constraints(Room._meta.db_table))
    
  1171.         constraint = ExclusionConstraint(
    
  1172.             name=constraint_name,
    
  1173.             expressions=[(Cast("number", IntegerField()), RangeOperators.EQUAL)],
    
  1174.         )
    
  1175.         with connection.schema_editor() as editor:
    
  1176.             editor.add_constraint(Room, constraint)
    
  1177.         self.assertIn(constraint_name, self.get_constraints(Room._meta.db_table))
    
  1178. 
    
  1179.     @isolate_apps("postgres_tests")
    
  1180.     def test_table_create(self):
    
  1181.         constraint_name = "exclusion_equal_number_tc"
    
  1182. 
    
  1183.         class ModelWithExclusionConstraint(Model):
    
  1184.             number = IntegerField()
    
  1185. 
    
  1186.             class Meta:
    
  1187.                 app_label = "postgres_tests"
    
  1188.                 constraints = [
    
  1189.                     ExclusionConstraint(
    
  1190.                         name=constraint_name,
    
  1191.                         expressions=[("number", RangeOperators.EQUAL)],
    
  1192.                     )
    
  1193.                 ]
    
  1194. 
    
  1195.         with connection.schema_editor() as editor:
    
  1196.             editor.create_model(ModelWithExclusionConstraint)
    
  1197.         self.assertIn(
    
  1198.             constraint_name,
    
  1199.             self.get_constraints(ModelWithExclusionConstraint._meta.db_table),
    
  1200.         )
    
  1201. 
    
  1202. 
    
  1203. @modify_settings(INSTALLED_APPS={"append": "django.contrib.postgres"})
    
  1204. class ExclusionConstraintOpclassesDepracationTests(PostgreSQLTestCase):
    
  1205.     def get_constraints(self, table):
    
  1206.         """Get the constraints on the table using a new cursor."""
    
  1207.         with connection.cursor() as cursor:
    
  1208.             return connection.introspection.get_constraints(cursor, table)
    
  1209. 
    
  1210.     def test_warning(self):
    
  1211.         msg = (
    
  1212.             "The opclasses argument is deprecated in favor of using "
    
  1213.             "django.contrib.postgres.indexes.OpClass in "
    
  1214.             "ExclusionConstraint.expressions."
    
  1215.         )
    
  1216.         with self.assertWarnsMessage(RemovedInDjango50Warning, msg):
    
  1217.             ExclusionConstraint(
    
  1218.                 name="exclude_overlapping",
    
  1219.                 expressions=[(F("datespan"), RangeOperators.ADJACENT_TO)],
    
  1220.                 opclasses=["range_ops"],
    
  1221.             )
    
  1222. 
    
  1223.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  1224.     def test_repr(self):
    
  1225.         constraint = ExclusionConstraint(
    
  1226.             name="exclude_overlapping",
    
  1227.             expressions=[(F("datespan"), RangeOperators.ADJACENT_TO)],
    
  1228.             opclasses=["range_ops"],
    
  1229.         )
    
  1230.         self.assertEqual(
    
  1231.             repr(constraint),
    
  1232.             "<ExclusionConstraint: index_type='GIST' expressions=["
    
  1233.             "(F(datespan), '-|-')] name='exclude_overlapping' "
    
  1234.             "opclasses=['range_ops']>",
    
  1235.         )
    
  1236. 
    
  1237.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  1238.     def test_range_adjacent_opclasses(self):
    
  1239.         constraint_name = "ints_adjacent_opclasses"
    
  1240.         self.assertNotIn(
    
  1241.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1242.         )
    
  1243.         constraint = ExclusionConstraint(
    
  1244.             name=constraint_name,
    
  1245.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1246.             opclasses=["range_ops"],
    
  1247.         )
    
  1248.         with connection.schema_editor() as editor:
    
  1249.             editor.add_constraint(RangesModel, constraint)
    
  1250.         constraints = self.get_constraints(RangesModel._meta.db_table)
    
  1251.         self.assertIn(constraint_name, constraints)
    
  1252.         with editor.connection.cursor() as cursor:
    
  1253.             cursor.execute(SchemaTests.get_opclass_query, [constraint.name])
    
  1254.             self.assertEqual(
    
  1255.                 cursor.fetchall(),
    
  1256.                 [("range_ops", constraint.name)],
    
  1257.             )
    
  1258.         RangesModel.objects.create(ints=(20, 50))
    
  1259.         with self.assertRaises(IntegrityError), transaction.atomic():
    
  1260.             RangesModel.objects.create(ints=(10, 20))
    
  1261.         RangesModel.objects.create(ints=(10, 19))
    
  1262.         RangesModel.objects.create(ints=(51, 60))
    
  1263.         # Drop the constraint.
    
  1264.         with connection.schema_editor() as editor:
    
  1265.             editor.remove_constraint(RangesModel, constraint)
    
  1266.         self.assertNotIn(
    
  1267.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1268.         )
    
  1269. 
    
  1270.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  1271.     def test_range_adjacent_opclasses_condition(self):
    
  1272.         constraint_name = "ints_adjacent_opclasses_condition"
    
  1273.         self.assertNotIn(
    
  1274.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1275.         )
    
  1276.         constraint = ExclusionConstraint(
    
  1277.             name=constraint_name,
    
  1278.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1279.             opclasses=["range_ops"],
    
  1280.             condition=Q(id__gte=100),
    
  1281.         )
    
  1282.         with connection.schema_editor() as editor:
    
  1283.             editor.add_constraint(RangesModel, constraint)
    
  1284.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1285. 
    
  1286.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  1287.     def test_range_adjacent_opclasses_deferrable(self):
    
  1288.         constraint_name = "ints_adjacent_opclasses_deferrable"
    
  1289.         self.assertNotIn(
    
  1290.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1291.         )
    
  1292.         constraint = ExclusionConstraint(
    
  1293.             name=constraint_name,
    
  1294.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1295.             opclasses=["range_ops"],
    
  1296.             deferrable=Deferrable.DEFERRED,
    
  1297.         )
    
  1298.         with connection.schema_editor() as editor:
    
  1299.             editor.add_constraint(RangesModel, constraint)
    
  1300.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1301. 
    
  1302.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  1303.     @skipUnlessDBFeature("supports_covering_gist_indexes")
    
  1304.     def test_range_adjacent_gist_opclasses_include(self):
    
  1305.         constraint_name = "ints_adjacent_gist_opclasses_include"
    
  1306.         self.assertNotIn(
    
  1307.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1308.         )
    
  1309.         constraint = ExclusionConstraint(
    
  1310.             name=constraint_name,
    
  1311.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1312.             index_type="gist",
    
  1313.             opclasses=["range_ops"],
    
  1314.             include=["decimals"],
    
  1315.         )
    
  1316.         with connection.schema_editor() as editor:
    
  1317.             editor.add_constraint(RangesModel, constraint)
    
  1318.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
    
  1319. 
    
  1320.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  1321.     @skipUnlessDBFeature("supports_covering_spgist_indexes")
    
  1322.     def test_range_adjacent_spgist_opclasses_include(self):
    
  1323.         constraint_name = "ints_adjacent_spgist_opclasses_include"
    
  1324.         self.assertNotIn(
    
  1325.             constraint_name, self.get_constraints(RangesModel._meta.db_table)
    
  1326.         )
    
  1327.         constraint = ExclusionConstraint(
    
  1328.             name=constraint_name,
    
  1329.             expressions=[("ints", RangeOperators.ADJACENT_TO)],
    
  1330.             index_type="spgist",
    
  1331.             opclasses=["range_ops"],
    
  1332.             include=["decimals"],
    
  1333.         )
    
  1334.         with connection.schema_editor() as editor:
    
  1335.             editor.add_constraint(RangesModel, constraint)
    
  1336.         self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))