1. from unittest import mock
    
  2. 
    
  3. from django.contrib.postgres.indexes import (
    
  4.     BloomIndex,
    
  5.     BrinIndex,
    
  6.     BTreeIndex,
    
  7.     GinIndex,
    
  8.     GistIndex,
    
  9.     HashIndex,
    
  10.     OpClass,
    
  11.     PostgresIndex,
    
  12.     SpGistIndex,
    
  13. )
    
  14. from django.db import NotSupportedError, connection
    
  15. from django.db.models import CharField, F, Index, Q
    
  16. from django.db.models.functions import Cast, Collate, Length, Lower
    
  17. from django.test import skipUnlessDBFeature
    
  18. from django.test.utils import modify_settings, register_lookup
    
  19. 
    
  20. from . import PostgreSQLSimpleTestCase, PostgreSQLTestCase
    
  21. from .fields import SearchVector, SearchVectorField
    
  22. from .models import CharFieldModel, IntegerArrayModel, Scene, TextFieldModel
    
  23. 
    
  24. 
    
  25. class IndexTestMixin:
    
  26.     def test_name_auto_generation(self):
    
  27.         index = self.index_class(fields=["field"])
    
  28.         index.set_name_with_model(CharFieldModel)
    
  29.         self.assertRegex(
    
  30.             index.name, r"postgres_te_field_[0-9a-f]{6}_%s" % self.index_class.suffix
    
  31.         )
    
  32. 
    
  33.     def test_deconstruction_no_customization(self):
    
  34.         index = self.index_class(
    
  35.             fields=["title"], name="test_title_%s" % self.index_class.suffix
    
  36.         )
    
  37.         path, args, kwargs = index.deconstruct()
    
  38.         self.assertEqual(
    
  39.             path, "django.contrib.postgres.indexes.%s" % self.index_class.__name__
    
  40.         )
    
  41.         self.assertEqual(args, ())
    
  42.         self.assertEqual(
    
  43.             kwargs,
    
  44.             {"fields": ["title"], "name": "test_title_%s" % self.index_class.suffix},
    
  45.         )
    
  46. 
    
  47.     def test_deconstruction_with_expressions_no_customization(self):
    
  48.         name = f"test_title_{self.index_class.suffix}"
    
  49.         index = self.index_class(Lower("title"), name=name)
    
  50.         path, args, kwargs = index.deconstruct()
    
  51.         self.assertEqual(
    
  52.             path,
    
  53.             f"django.contrib.postgres.indexes.{self.index_class.__name__}",
    
  54.         )
    
  55.         self.assertEqual(args, (Lower("title"),))
    
  56.         self.assertEqual(kwargs, {"name": name})
    
  57. 
    
  58. 
    
  59. class BloomIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase):
    
  60.     index_class = BloomIndex
    
  61. 
    
  62.     def test_suffix(self):
    
  63.         self.assertEqual(BloomIndex.suffix, "bloom")
    
  64. 
    
  65.     def test_deconstruction(self):
    
  66.         index = BloomIndex(fields=["title"], name="test_bloom", length=80, columns=[4])
    
  67.         path, args, kwargs = index.deconstruct()
    
  68.         self.assertEqual(path, "django.contrib.postgres.indexes.BloomIndex")
    
  69.         self.assertEqual(args, ())
    
  70.         self.assertEqual(
    
  71.             kwargs,
    
  72.             {
    
  73.                 "fields": ["title"],
    
  74.                 "name": "test_bloom",
    
  75.                 "length": 80,
    
  76.                 "columns": [4],
    
  77.             },
    
  78.         )
    
  79. 
    
  80.     def test_invalid_fields(self):
    
  81.         msg = "Bloom indexes support a maximum of 32 fields."
    
  82.         with self.assertRaisesMessage(ValueError, msg):
    
  83.             BloomIndex(fields=["title"] * 33, name="test_bloom")
    
  84. 
    
  85.     def test_invalid_columns(self):
    
  86.         msg = "BloomIndex.columns must be a list or tuple."
    
  87.         with self.assertRaisesMessage(ValueError, msg):
    
  88.             BloomIndex(fields=["title"], name="test_bloom", columns="x")
    
  89.         msg = "BloomIndex.columns cannot have more values than fields."
    
  90.         with self.assertRaisesMessage(ValueError, msg):
    
  91.             BloomIndex(fields=["title"], name="test_bloom", columns=[4, 3])
    
  92. 
    
  93.     def test_invalid_columns_value(self):
    
  94.         msg = "BloomIndex.columns must contain integers from 1 to 4095."
    
  95.         for length in (0, 4096):
    
  96.             with self.subTest(length), self.assertRaisesMessage(ValueError, msg):
    
  97.                 BloomIndex(fields=["title"], name="test_bloom", columns=[length])
    
  98. 
    
  99.     def test_invalid_length(self):
    
  100.         msg = "BloomIndex.length must be None or an integer from 1 to 4096."
    
  101.         for length in (0, 4097):
    
  102.             with self.subTest(length), self.assertRaisesMessage(ValueError, msg):
    
  103.                 BloomIndex(fields=["title"], name="test_bloom", length=length)
    
  104. 
    
  105. 
    
  106. class BrinIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase):
    
  107.     index_class = BrinIndex
    
  108. 
    
  109.     def test_suffix(self):
    
  110.         self.assertEqual(BrinIndex.suffix, "brin")
    
  111. 
    
  112.     def test_deconstruction(self):
    
  113.         index = BrinIndex(
    
  114.             fields=["title"],
    
  115.             name="test_title_brin",
    
  116.             autosummarize=True,
    
  117.             pages_per_range=16,
    
  118.         )
    
  119.         path, args, kwargs = index.deconstruct()
    
  120.         self.assertEqual(path, "django.contrib.postgres.indexes.BrinIndex")
    
  121.         self.assertEqual(args, ())
    
  122.         self.assertEqual(
    
  123.             kwargs,
    
  124.             {
    
  125.                 "fields": ["title"],
    
  126.                 "name": "test_title_brin",
    
  127.                 "autosummarize": True,
    
  128.                 "pages_per_range": 16,
    
  129.             },
    
  130.         )
    
  131. 
    
  132.     def test_invalid_pages_per_range(self):
    
  133.         with self.assertRaisesMessage(
    
  134.             ValueError, "pages_per_range must be None or a positive integer"
    
  135.         ):
    
  136.             BrinIndex(fields=["title"], name="test_title_brin", pages_per_range=0)
    
  137. 
    
  138. 
    
  139. class BTreeIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase):
    
  140.     index_class = BTreeIndex
    
  141. 
    
  142.     def test_suffix(self):
    
  143.         self.assertEqual(BTreeIndex.suffix, "btree")
    
  144. 
    
  145.     def test_deconstruction(self):
    
  146.         index = BTreeIndex(fields=["title"], name="test_title_btree", fillfactor=80)
    
  147.         path, args, kwargs = index.deconstruct()
    
  148.         self.assertEqual(path, "django.contrib.postgres.indexes.BTreeIndex")
    
  149.         self.assertEqual(args, ())
    
  150.         self.assertEqual(
    
  151.             kwargs, {"fields": ["title"], "name": "test_title_btree", "fillfactor": 80}
    
  152.         )
    
  153. 
    
  154. 
    
  155. class GinIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase):
    
  156.     index_class = GinIndex
    
  157. 
    
  158.     def test_suffix(self):
    
  159.         self.assertEqual(GinIndex.suffix, "gin")
    
  160. 
    
  161.     def test_deconstruction(self):
    
  162.         index = GinIndex(
    
  163.             fields=["title"],
    
  164.             name="test_title_gin",
    
  165.             fastupdate=True,
    
  166.             gin_pending_list_limit=128,
    
  167.         )
    
  168.         path, args, kwargs = index.deconstruct()
    
  169.         self.assertEqual(path, "django.contrib.postgres.indexes.GinIndex")
    
  170.         self.assertEqual(args, ())
    
  171.         self.assertEqual(
    
  172.             kwargs,
    
  173.             {
    
  174.                 "fields": ["title"],
    
  175.                 "name": "test_title_gin",
    
  176.                 "fastupdate": True,
    
  177.                 "gin_pending_list_limit": 128,
    
  178.             },
    
  179.         )
    
  180. 
    
  181. 
    
  182. class GistIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase):
    
  183.     index_class = GistIndex
    
  184. 
    
  185.     def test_suffix(self):
    
  186.         self.assertEqual(GistIndex.suffix, "gist")
    
  187. 
    
  188.     def test_deconstruction(self):
    
  189.         index = GistIndex(
    
  190.             fields=["title"], name="test_title_gist", buffering=False, fillfactor=80
    
  191.         )
    
  192.         path, args, kwargs = index.deconstruct()
    
  193.         self.assertEqual(path, "django.contrib.postgres.indexes.GistIndex")
    
  194.         self.assertEqual(args, ())
    
  195.         self.assertEqual(
    
  196.             kwargs,
    
  197.             {
    
  198.                 "fields": ["title"],
    
  199.                 "name": "test_title_gist",
    
  200.                 "buffering": False,
    
  201.                 "fillfactor": 80,
    
  202.             },
    
  203.         )
    
  204. 
    
  205. 
    
  206. class HashIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase):
    
  207.     index_class = HashIndex
    
  208. 
    
  209.     def test_suffix(self):
    
  210.         self.assertEqual(HashIndex.suffix, "hash")
    
  211. 
    
  212.     def test_deconstruction(self):
    
  213.         index = HashIndex(fields=["title"], name="test_title_hash", fillfactor=80)
    
  214.         path, args, kwargs = index.deconstruct()
    
  215.         self.assertEqual(path, "django.contrib.postgres.indexes.HashIndex")
    
  216.         self.assertEqual(args, ())
    
  217.         self.assertEqual(
    
  218.             kwargs, {"fields": ["title"], "name": "test_title_hash", "fillfactor": 80}
    
  219.         )
    
  220. 
    
  221. 
    
  222. class SpGistIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase):
    
  223.     index_class = SpGistIndex
    
  224. 
    
  225.     def test_suffix(self):
    
  226.         self.assertEqual(SpGistIndex.suffix, "spgist")
    
  227. 
    
  228.     def test_deconstruction(self):
    
  229.         index = SpGistIndex(fields=["title"], name="test_title_spgist", fillfactor=80)
    
  230.         path, args, kwargs = index.deconstruct()
    
  231.         self.assertEqual(path, "django.contrib.postgres.indexes.SpGistIndex")
    
  232.         self.assertEqual(args, ())
    
  233.         self.assertEqual(
    
  234.             kwargs, {"fields": ["title"], "name": "test_title_spgist", "fillfactor": 80}
    
  235.         )
    
  236. 
    
  237. 
    
  238. @modify_settings(INSTALLED_APPS={"append": "django.contrib.postgres"})
    
  239. class SchemaTests(PostgreSQLTestCase):
    
  240.     get_opclass_query = """
    
  241.         SELECT opcname, c.relname FROM pg_opclass AS oc
    
  242.         JOIN pg_index as i on oc.oid = ANY(i.indclass)
    
  243.         JOIN pg_class as c on c.oid = i.indexrelid
    
  244.         WHERE c.relname = %s
    
  245.     """
    
  246. 
    
  247.     def get_constraints(self, table):
    
  248.         """
    
  249.         Get the indexes on the table using a new cursor.
    
  250.         """
    
  251.         with connection.cursor() as cursor:
    
  252.             return connection.introspection.get_constraints(cursor, table)
    
  253. 
    
  254.     def test_gin_index(self):
    
  255.         # Ensure the table is there and doesn't have an index.
    
  256.         self.assertNotIn(
    
  257.             "field", self.get_constraints(IntegerArrayModel._meta.db_table)
    
  258.         )
    
  259.         # Add the index
    
  260.         index_name = "integer_array_model_field_gin"
    
  261.         index = GinIndex(fields=["field"], name=index_name)
    
  262.         with connection.schema_editor() as editor:
    
  263.             editor.add_index(IntegerArrayModel, index)
    
  264.         constraints = self.get_constraints(IntegerArrayModel._meta.db_table)
    
  265.         # Check gin index was added
    
  266.         self.assertEqual(constraints[index_name]["type"], GinIndex.suffix)
    
  267.         # Drop the index
    
  268.         with connection.schema_editor() as editor:
    
  269.             editor.remove_index(IntegerArrayModel, index)
    
  270.         self.assertNotIn(
    
  271.             index_name, self.get_constraints(IntegerArrayModel._meta.db_table)
    
  272.         )
    
  273. 
    
  274.     def test_gin_fastupdate(self):
    
  275.         index_name = "integer_array_gin_fastupdate"
    
  276.         index = GinIndex(fields=["field"], name=index_name, fastupdate=False)
    
  277.         with connection.schema_editor() as editor:
    
  278.             editor.add_index(IntegerArrayModel, index)
    
  279.         constraints = self.get_constraints(IntegerArrayModel._meta.db_table)
    
  280.         self.assertEqual(constraints[index_name]["type"], "gin")
    
  281.         self.assertEqual(constraints[index_name]["options"], ["fastupdate=off"])
    
  282.         with connection.schema_editor() as editor:
    
  283.             editor.remove_index(IntegerArrayModel, index)
    
  284.         self.assertNotIn(
    
  285.             index_name, self.get_constraints(IntegerArrayModel._meta.db_table)
    
  286.         )
    
  287. 
    
  288.     def test_partial_gin_index(self):
    
  289.         with register_lookup(CharField, Length):
    
  290.             index_name = "char_field_gin_partial_idx"
    
  291.             index = GinIndex(
    
  292.                 fields=["field"], name=index_name, condition=Q(field__length=40)
    
  293.             )
    
  294.             with connection.schema_editor() as editor:
    
  295.                 editor.add_index(CharFieldModel, index)
    
  296.             constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  297.             self.assertEqual(constraints[index_name]["type"], "gin")
    
  298.             with connection.schema_editor() as editor:
    
  299.                 editor.remove_index(CharFieldModel, index)
    
  300.             self.assertNotIn(
    
  301.                 index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  302.             )
    
  303. 
    
  304.     def test_partial_gin_index_with_tablespace(self):
    
  305.         with register_lookup(CharField, Length):
    
  306.             index_name = "char_field_gin_partial_idx"
    
  307.             index = GinIndex(
    
  308.                 fields=["field"],
    
  309.                 name=index_name,
    
  310.                 condition=Q(field__length=40),
    
  311.                 db_tablespace="pg_default",
    
  312.             )
    
  313.             with connection.schema_editor() as editor:
    
  314.                 editor.add_index(CharFieldModel, index)
    
  315.                 self.assertIn(
    
  316.                     'TABLESPACE "pg_default" ',
    
  317.                     str(index.create_sql(CharFieldModel, editor)),
    
  318.                 )
    
  319.             constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  320.             self.assertEqual(constraints[index_name]["type"], "gin")
    
  321.             with connection.schema_editor() as editor:
    
  322.                 editor.remove_index(CharFieldModel, index)
    
  323.             self.assertNotIn(
    
  324.                 index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  325.             )
    
  326. 
    
  327.     def test_gin_parameters(self):
    
  328.         index_name = "integer_array_gin_params"
    
  329.         index = GinIndex(
    
  330.             fields=["field"],
    
  331.             name=index_name,
    
  332.             fastupdate=True,
    
  333.             gin_pending_list_limit=64,
    
  334.             db_tablespace="pg_default",
    
  335.         )
    
  336.         with connection.schema_editor() as editor:
    
  337.             editor.add_index(IntegerArrayModel, index)
    
  338.             self.assertIn(
    
  339.                 ") WITH (gin_pending_list_limit = 64, fastupdate = on) TABLESPACE",
    
  340.                 str(index.create_sql(IntegerArrayModel, editor)),
    
  341.             )
    
  342.         constraints = self.get_constraints(IntegerArrayModel._meta.db_table)
    
  343.         self.assertEqual(constraints[index_name]["type"], "gin")
    
  344.         self.assertEqual(
    
  345.             constraints[index_name]["options"],
    
  346.             ["gin_pending_list_limit=64", "fastupdate=on"],
    
  347.         )
    
  348.         with connection.schema_editor() as editor:
    
  349.             editor.remove_index(IntegerArrayModel, index)
    
  350.         self.assertNotIn(
    
  351.             index_name, self.get_constraints(IntegerArrayModel._meta.db_table)
    
  352.         )
    
  353. 
    
  354.     def test_trigram_op_class_gin_index(self):
    
  355.         index_name = "trigram_op_class_gin"
    
  356.         index = GinIndex(OpClass(F("scene"), name="gin_trgm_ops"), name=index_name)
    
  357.         with connection.schema_editor() as editor:
    
  358.             editor.add_index(Scene, index)
    
  359.         with editor.connection.cursor() as cursor:
    
  360.             cursor.execute(self.get_opclass_query, [index_name])
    
  361.             self.assertCountEqual(cursor.fetchall(), [("gin_trgm_ops", index_name)])
    
  362.         constraints = self.get_constraints(Scene._meta.db_table)
    
  363.         self.assertIn(index_name, constraints)
    
  364.         self.assertIn(constraints[index_name]["type"], GinIndex.suffix)
    
  365.         with connection.schema_editor() as editor:
    
  366.             editor.remove_index(Scene, index)
    
  367.         self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table))
    
  368. 
    
  369.     def test_cast_search_vector_gin_index(self):
    
  370.         index_name = "cast_search_vector_gin"
    
  371.         index = GinIndex(Cast("field", SearchVectorField()), name=index_name)
    
  372.         with connection.schema_editor() as editor:
    
  373.             editor.add_index(TextFieldModel, index)
    
  374.             sql = index.create_sql(TextFieldModel, editor)
    
  375.         table = TextFieldModel._meta.db_table
    
  376.         constraints = self.get_constraints(table)
    
  377.         self.assertIn(index_name, constraints)
    
  378.         self.assertIn(constraints[index_name]["type"], GinIndex.suffix)
    
  379.         self.assertIs(sql.references_column(table, "field"), True)
    
  380.         self.assertIn("::tsvector", str(sql))
    
  381.         with connection.schema_editor() as editor:
    
  382.             editor.remove_index(TextFieldModel, index)
    
  383.         self.assertNotIn(index_name, self.get_constraints(table))
    
  384. 
    
  385.     def test_bloom_index(self):
    
  386.         index_name = "char_field_model_field_bloom"
    
  387.         index = BloomIndex(fields=["field"], name=index_name)
    
  388.         with connection.schema_editor() as editor:
    
  389.             editor.add_index(CharFieldModel, index)
    
  390.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  391.         self.assertEqual(constraints[index_name]["type"], BloomIndex.suffix)
    
  392.         with connection.schema_editor() as editor:
    
  393.             editor.remove_index(CharFieldModel, index)
    
  394.         self.assertNotIn(
    
  395.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  396.         )
    
  397. 
    
  398.     def test_bloom_parameters(self):
    
  399.         index_name = "char_field_model_field_bloom_params"
    
  400.         index = BloomIndex(fields=["field"], name=index_name, length=512, columns=[3])
    
  401.         with connection.schema_editor() as editor:
    
  402.             editor.add_index(CharFieldModel, index)
    
  403.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  404.         self.assertEqual(constraints[index_name]["type"], BloomIndex.suffix)
    
  405.         self.assertEqual(constraints[index_name]["options"], ["length=512", "col1=3"])
    
  406.         with connection.schema_editor() as editor:
    
  407.             editor.remove_index(CharFieldModel, index)
    
  408.         self.assertNotIn(
    
  409.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  410.         )
    
  411. 
    
  412.     def test_brin_index(self):
    
  413.         index_name = "char_field_model_field_brin"
    
  414.         index = BrinIndex(fields=["field"], name=index_name, pages_per_range=4)
    
  415.         with connection.schema_editor() as editor:
    
  416.             editor.add_index(CharFieldModel, index)
    
  417.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  418.         self.assertEqual(constraints[index_name]["type"], BrinIndex.suffix)
    
  419.         self.assertEqual(constraints[index_name]["options"], ["pages_per_range=4"])
    
  420.         with connection.schema_editor() as editor:
    
  421.             editor.remove_index(CharFieldModel, index)
    
  422.         self.assertNotIn(
    
  423.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  424.         )
    
  425. 
    
  426.     def test_brin_parameters(self):
    
  427.         index_name = "char_field_brin_params"
    
  428.         index = BrinIndex(fields=["field"], name=index_name, autosummarize=True)
    
  429.         with connection.schema_editor() as editor:
    
  430.             editor.add_index(CharFieldModel, index)
    
  431.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  432.         self.assertEqual(constraints[index_name]["type"], BrinIndex.suffix)
    
  433.         self.assertEqual(constraints[index_name]["options"], ["autosummarize=on"])
    
  434.         with connection.schema_editor() as editor:
    
  435.             editor.remove_index(CharFieldModel, index)
    
  436.         self.assertNotIn(
    
  437.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  438.         )
    
  439. 
    
  440.     def test_btree_index(self):
    
  441.         # Ensure the table is there and doesn't have an index.
    
  442.         self.assertNotIn("field", self.get_constraints(CharFieldModel._meta.db_table))
    
  443.         # Add the index.
    
  444.         index_name = "char_field_model_field_btree"
    
  445.         index = BTreeIndex(fields=["field"], name=index_name)
    
  446.         with connection.schema_editor() as editor:
    
  447.             editor.add_index(CharFieldModel, index)
    
  448.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  449.         # The index was added.
    
  450.         self.assertEqual(constraints[index_name]["type"], BTreeIndex.suffix)
    
  451.         # Drop the index.
    
  452.         with connection.schema_editor() as editor:
    
  453.             editor.remove_index(CharFieldModel, index)
    
  454.         self.assertNotIn(
    
  455.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  456.         )
    
  457. 
    
  458.     def test_btree_parameters(self):
    
  459.         index_name = "integer_array_btree_fillfactor"
    
  460.         index = BTreeIndex(fields=["field"], name=index_name, fillfactor=80)
    
  461.         with connection.schema_editor() as editor:
    
  462.             editor.add_index(CharFieldModel, index)
    
  463.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  464.         self.assertEqual(constraints[index_name]["type"], BTreeIndex.suffix)
    
  465.         self.assertEqual(constraints[index_name]["options"], ["fillfactor=80"])
    
  466.         with connection.schema_editor() as editor:
    
  467.             editor.remove_index(CharFieldModel, index)
    
  468.         self.assertNotIn(
    
  469.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  470.         )
    
  471. 
    
  472.     def test_gist_index(self):
    
  473.         # Ensure the table is there and doesn't have an index.
    
  474.         self.assertNotIn("field", self.get_constraints(CharFieldModel._meta.db_table))
    
  475.         # Add the index.
    
  476.         index_name = "char_field_model_field_gist"
    
  477.         index = GistIndex(fields=["field"], name=index_name)
    
  478.         with connection.schema_editor() as editor:
    
  479.             editor.add_index(CharFieldModel, index)
    
  480.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  481.         # The index was added.
    
  482.         self.assertEqual(constraints[index_name]["type"], GistIndex.suffix)
    
  483.         # Drop the index.
    
  484.         with connection.schema_editor() as editor:
    
  485.             editor.remove_index(CharFieldModel, index)
    
  486.         self.assertNotIn(
    
  487.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  488.         )
    
  489. 
    
  490.     def test_gist_parameters(self):
    
  491.         index_name = "integer_array_gist_buffering"
    
  492.         index = GistIndex(
    
  493.             fields=["field"], name=index_name, buffering=True, fillfactor=80
    
  494.         )
    
  495.         with connection.schema_editor() as editor:
    
  496.             editor.add_index(CharFieldModel, index)
    
  497.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  498.         self.assertEqual(constraints[index_name]["type"], GistIndex.suffix)
    
  499.         self.assertEqual(
    
  500.             constraints[index_name]["options"], ["buffering=on", "fillfactor=80"]
    
  501.         )
    
  502.         with connection.schema_editor() as editor:
    
  503.             editor.remove_index(CharFieldModel, index)
    
  504.         self.assertNotIn(
    
  505.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  506.         )
    
  507. 
    
  508.     @skipUnlessDBFeature("supports_covering_gist_indexes")
    
  509.     def test_gist_include(self):
    
  510.         index_name = "scene_gist_include_setting"
    
  511.         index = GistIndex(name=index_name, fields=["scene"], include=["setting"])
    
  512.         with connection.schema_editor() as editor:
    
  513.             editor.add_index(Scene, index)
    
  514.         constraints = self.get_constraints(Scene._meta.db_table)
    
  515.         self.assertIn(index_name, constraints)
    
  516.         self.assertEqual(constraints[index_name]["type"], GistIndex.suffix)
    
  517.         self.assertEqual(constraints[index_name]["columns"], ["scene", "setting"])
    
  518.         with connection.schema_editor() as editor:
    
  519.             editor.remove_index(Scene, index)
    
  520.         self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table))
    
  521. 
    
  522.     def test_gist_include_not_supported(self):
    
  523.         index_name = "gist_include_exception"
    
  524.         index = GistIndex(fields=["scene"], name=index_name, include=["setting"])
    
  525.         msg = "Covering GiST indexes require PostgreSQL 12+."
    
  526.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  527.             with mock.patch(
    
  528.                 "django.db.backends.postgresql.features.DatabaseFeatures."
    
  529.                 "supports_covering_gist_indexes",
    
  530.                 False,
    
  531.             ):
    
  532.                 with connection.schema_editor() as editor:
    
  533.                     editor.add_index(Scene, index)
    
  534.         self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table))
    
  535. 
    
  536.     def test_tsvector_op_class_gist_index(self):
    
  537.         index_name = "tsvector_op_class_gist"
    
  538.         index = GistIndex(
    
  539.             OpClass(
    
  540.                 SearchVector("scene", "setting", config="english"),
    
  541.                 name="tsvector_ops",
    
  542.             ),
    
  543.             name=index_name,
    
  544.         )
    
  545.         with connection.schema_editor() as editor:
    
  546.             editor.add_index(Scene, index)
    
  547.             sql = index.create_sql(Scene, editor)
    
  548.         table = Scene._meta.db_table
    
  549.         constraints = self.get_constraints(table)
    
  550.         self.assertIn(index_name, constraints)
    
  551.         self.assertIn(constraints[index_name]["type"], GistIndex.suffix)
    
  552.         self.assertIs(sql.references_column(table, "scene"), True)
    
  553.         self.assertIs(sql.references_column(table, "setting"), True)
    
  554.         with connection.schema_editor() as editor:
    
  555.             editor.remove_index(Scene, index)
    
  556.         self.assertNotIn(index_name, self.get_constraints(table))
    
  557. 
    
  558.     def test_hash_index(self):
    
  559.         # Ensure the table is there and doesn't have an index.
    
  560.         self.assertNotIn("field", self.get_constraints(CharFieldModel._meta.db_table))
    
  561.         # Add the index.
    
  562.         index_name = "char_field_model_field_hash"
    
  563.         index = HashIndex(fields=["field"], name=index_name)
    
  564.         with connection.schema_editor() as editor:
    
  565.             editor.add_index(CharFieldModel, index)
    
  566.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  567.         # The index was added.
    
  568.         self.assertEqual(constraints[index_name]["type"], HashIndex.suffix)
    
  569.         # Drop the index.
    
  570.         with connection.schema_editor() as editor:
    
  571.             editor.remove_index(CharFieldModel, index)
    
  572.         self.assertNotIn(
    
  573.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  574.         )
    
  575. 
    
  576.     def test_hash_parameters(self):
    
  577.         index_name = "integer_array_hash_fillfactor"
    
  578.         index = HashIndex(fields=["field"], name=index_name, fillfactor=80)
    
  579.         with connection.schema_editor() as editor:
    
  580.             editor.add_index(CharFieldModel, index)
    
  581.         constraints = self.get_constraints(CharFieldModel._meta.db_table)
    
  582.         self.assertEqual(constraints[index_name]["type"], HashIndex.suffix)
    
  583.         self.assertEqual(constraints[index_name]["options"], ["fillfactor=80"])
    
  584.         with connection.schema_editor() as editor:
    
  585.             editor.remove_index(CharFieldModel, index)
    
  586.         self.assertNotIn(
    
  587.             index_name, self.get_constraints(CharFieldModel._meta.db_table)
    
  588.         )
    
  589. 
    
  590.     def test_spgist_index(self):
    
  591.         # Ensure the table is there and doesn't have an index.
    
  592.         self.assertNotIn("field", self.get_constraints(TextFieldModel._meta.db_table))
    
  593.         # Add the index.
    
  594.         index_name = "text_field_model_field_spgist"
    
  595.         index = SpGistIndex(fields=["field"], name=index_name)
    
  596.         with connection.schema_editor() as editor:
    
  597.             editor.add_index(TextFieldModel, index)
    
  598.         constraints = self.get_constraints(TextFieldModel._meta.db_table)
    
  599.         # The index was added.
    
  600.         self.assertEqual(constraints[index_name]["type"], SpGistIndex.suffix)
    
  601.         # Drop the index.
    
  602.         with connection.schema_editor() as editor:
    
  603.             editor.remove_index(TextFieldModel, index)
    
  604.         self.assertNotIn(
    
  605.             index_name, self.get_constraints(TextFieldModel._meta.db_table)
    
  606.         )
    
  607. 
    
  608.     def test_spgist_parameters(self):
    
  609.         index_name = "text_field_model_spgist_fillfactor"
    
  610.         index = SpGistIndex(fields=["field"], name=index_name, fillfactor=80)
    
  611.         with connection.schema_editor() as editor:
    
  612.             editor.add_index(TextFieldModel, index)
    
  613.         constraints = self.get_constraints(TextFieldModel._meta.db_table)
    
  614.         self.assertEqual(constraints[index_name]["type"], SpGistIndex.suffix)
    
  615.         self.assertEqual(constraints[index_name]["options"], ["fillfactor=80"])
    
  616.         with connection.schema_editor() as editor:
    
  617.             editor.remove_index(TextFieldModel, index)
    
  618.         self.assertNotIn(
    
  619.             index_name, self.get_constraints(TextFieldModel._meta.db_table)
    
  620.         )
    
  621. 
    
  622.     @skipUnlessDBFeature("supports_covering_spgist_indexes")
    
  623.     def test_spgist_include(self):
    
  624.         index_name = "scene_spgist_include_setting"
    
  625.         index = SpGistIndex(name=index_name, fields=["scene"], include=["setting"])
    
  626.         with connection.schema_editor() as editor:
    
  627.             editor.add_index(Scene, index)
    
  628.         constraints = self.get_constraints(Scene._meta.db_table)
    
  629.         self.assertIn(index_name, constraints)
    
  630.         self.assertEqual(constraints[index_name]["type"], SpGistIndex.suffix)
    
  631.         self.assertEqual(constraints[index_name]["columns"], ["scene", "setting"])
    
  632.         with connection.schema_editor() as editor:
    
  633.             editor.remove_index(Scene, index)
    
  634.         self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table))
    
  635. 
    
  636.     def test_spgist_include_not_supported(self):
    
  637.         index_name = "spgist_include_exception"
    
  638.         index = SpGistIndex(fields=["scene"], name=index_name, include=["setting"])
    
  639.         msg = "Covering SP-GiST indexes require PostgreSQL 14+."
    
  640.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  641.             with mock.patch(
    
  642.                 "django.db.backends.postgresql.features.DatabaseFeatures."
    
  643.                 "supports_covering_spgist_indexes",
    
  644.                 False,
    
  645.             ):
    
  646.                 with connection.schema_editor() as editor:
    
  647.                     editor.add_index(Scene, index)
    
  648.         self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table))
    
  649. 
    
  650.     def test_custom_suffix(self):
    
  651.         class CustomSuffixIndex(PostgresIndex):
    
  652.             suffix = "sfx"
    
  653. 
    
  654.             def create_sql(self, model, schema_editor, using="gin", **kwargs):
    
  655.                 return super().create_sql(model, schema_editor, using=using, **kwargs)
    
  656. 
    
  657.         index = CustomSuffixIndex(fields=["field"], name="custom_suffix_idx")
    
  658.         self.assertEqual(index.suffix, "sfx")
    
  659.         with connection.schema_editor() as editor:
    
  660.             self.assertIn(
    
  661.                 " USING gin ",
    
  662.                 str(index.create_sql(CharFieldModel, editor)),
    
  663.             )
    
  664. 
    
  665.     def test_op_class(self):
    
  666.         index_name = "test_op_class"
    
  667.         index = Index(
    
  668.             OpClass(Lower("field"), name="text_pattern_ops"),
    
  669.             name=index_name,
    
  670.         )
    
  671.         with connection.schema_editor() as editor:
    
  672.             editor.add_index(TextFieldModel, index)
    
  673.         with editor.connection.cursor() as cursor:
    
  674.             cursor.execute(self.get_opclass_query, [index_name])
    
  675.             self.assertCountEqual(cursor.fetchall(), [("text_pattern_ops", index_name)])
    
  676. 
    
  677.     def test_op_class_descending_collation(self):
    
  678.         collation = connection.features.test_collations.get("non_default")
    
  679.         if not collation:
    
  680.             self.skipTest("This backend does not support case-insensitive collations.")
    
  681.         index_name = "test_op_class_descending_collation"
    
  682.         index = Index(
    
  683.             Collate(
    
  684.                 OpClass(Lower("field"), name="text_pattern_ops").desc(nulls_last=True),
    
  685.                 collation=collation,
    
  686.             ),
    
  687.             name=index_name,
    
  688.         )
    
  689.         with connection.schema_editor() as editor:
    
  690.             editor.add_index(TextFieldModel, index)
    
  691.             self.assertIn(
    
  692.                 "COLLATE %s" % editor.quote_name(collation),
    
  693.                 str(index.create_sql(TextFieldModel, editor)),
    
  694.             )
    
  695.         with editor.connection.cursor() as cursor:
    
  696.             cursor.execute(self.get_opclass_query, [index_name])
    
  697.             self.assertCountEqual(cursor.fetchall(), [("text_pattern_ops", index_name)])
    
  698.         table = TextFieldModel._meta.db_table
    
  699.         constraints = self.get_constraints(table)
    
  700.         self.assertIn(index_name, constraints)
    
  701.         self.assertEqual(constraints[index_name]["orders"], ["DESC"])
    
  702.         with connection.schema_editor() as editor:
    
  703.             editor.remove_index(TextFieldModel, index)
    
  704.         self.assertNotIn(index_name, self.get_constraints(table))
    
  705. 
    
  706.     def test_op_class_descending_partial(self):
    
  707.         index_name = "test_op_class_descending_partial"
    
  708.         index = Index(
    
  709.             OpClass(Lower("field"), name="text_pattern_ops").desc(),
    
  710.             name=index_name,
    
  711.             condition=Q(field__contains="China"),
    
  712.         )
    
  713.         with connection.schema_editor() as editor:
    
  714.             editor.add_index(TextFieldModel, index)
    
  715.         with editor.connection.cursor() as cursor:
    
  716.             cursor.execute(self.get_opclass_query, [index_name])
    
  717.             self.assertCountEqual(cursor.fetchall(), [("text_pattern_ops", index_name)])
    
  718.         constraints = self.get_constraints(TextFieldModel._meta.db_table)
    
  719.         self.assertIn(index_name, constraints)
    
  720.         self.assertEqual(constraints[index_name]["orders"], ["DESC"])
    
  721. 
    
  722.     def test_op_class_descending_partial_tablespace(self):
    
  723.         index_name = "test_op_class_descending_partial_tablespace"
    
  724.         index = Index(
    
  725.             OpClass(Lower("field").desc(), name="text_pattern_ops"),
    
  726.             name=index_name,
    
  727.             condition=Q(field__contains="China"),
    
  728.             db_tablespace="pg_default",
    
  729.         )
    
  730.         with connection.schema_editor() as editor:
    
  731.             editor.add_index(TextFieldModel, index)
    
  732.             self.assertIn(
    
  733.                 'TABLESPACE "pg_default" ',
    
  734.                 str(index.create_sql(TextFieldModel, editor)),
    
  735.             )
    
  736.         with editor.connection.cursor() as cursor:
    
  737.             cursor.execute(self.get_opclass_query, [index_name])
    
  738.             self.assertCountEqual(cursor.fetchall(), [("text_pattern_ops", index_name)])
    
  739.         constraints = self.get_constraints(TextFieldModel._meta.db_table)
    
  740.         self.assertIn(index_name, constraints)
    
  741.         self.assertEqual(constraints[index_name]["orders"], ["DESC"])