1. from django.db import DatabaseError, connection
    
  2. from django.db.models import Index
    
  3. from django.test import TransactionTestCase, skipUnlessDBFeature
    
  4. 
    
  5. from .models import (
    
  6.     Article,
    
  7.     ArticleReporter,
    
  8.     CheckConstraintModel,
    
  9.     City,
    
  10.     Comment,
    
  11.     Country,
    
  12.     District,
    
  13.     Reporter,
    
  14.     UniqueConstraintConditionModel,
    
  15. )
    
  16. 
    
  17. 
    
  18. class IntrospectionTests(TransactionTestCase):
    
  19.     available_apps = ["introspection"]
    
  20. 
    
  21.     def test_table_names(self):
    
  22.         tl = connection.introspection.table_names()
    
  23.         self.assertEqual(tl, sorted(tl))
    
  24.         self.assertIn(
    
  25.             Reporter._meta.db_table,
    
  26.             tl,
    
  27.             "'%s' isn't in table_list()." % Reporter._meta.db_table,
    
  28.         )
    
  29.         self.assertIn(
    
  30.             Article._meta.db_table,
    
  31.             tl,
    
  32.             "'%s' isn't in table_list()." % Article._meta.db_table,
    
  33.         )
    
  34. 
    
  35.     def test_django_table_names(self):
    
  36.         with connection.cursor() as cursor:
    
  37.             cursor.execute("CREATE TABLE django_ixn_test_table (id INTEGER);")
    
  38.             tl = connection.introspection.django_table_names()
    
  39.             cursor.execute("DROP TABLE django_ixn_test_table;")
    
  40.             self.assertNotIn(
    
  41.                 "django_ixn_test_table",
    
  42.                 tl,
    
  43.                 "django_table_names() returned a non-Django table",
    
  44.             )
    
  45. 
    
  46.     def test_django_table_names_retval_type(self):
    
  47.         # Table name is a list #15216
    
  48.         tl = connection.introspection.django_table_names(only_existing=True)
    
  49.         self.assertIs(type(tl), list)
    
  50.         tl = connection.introspection.django_table_names(only_existing=False)
    
  51.         self.assertIs(type(tl), list)
    
  52. 
    
  53.     def test_table_names_with_views(self):
    
  54.         with connection.cursor() as cursor:
    
  55.             try:
    
  56.                 cursor.execute(
    
  57.                     "CREATE VIEW introspection_article_view AS SELECT headline "
    
  58.                     "from introspection_article;"
    
  59.                 )
    
  60.             except DatabaseError as e:
    
  61.                 if "insufficient privileges" in str(e):
    
  62.                     self.fail("The test user has no CREATE VIEW privileges")
    
  63.                 else:
    
  64.                     raise
    
  65.         try:
    
  66.             self.assertIn(
    
  67.                 "introspection_article_view",
    
  68.                 connection.introspection.table_names(include_views=True),
    
  69.             )
    
  70.             self.assertNotIn(
    
  71.                 "introspection_article_view", connection.introspection.table_names()
    
  72.             )
    
  73.         finally:
    
  74.             with connection.cursor() as cursor:
    
  75.                 cursor.execute("DROP VIEW introspection_article_view")
    
  76. 
    
  77.     def test_unmanaged_through_model(self):
    
  78.         tables = connection.introspection.django_table_names()
    
  79.         self.assertNotIn(ArticleReporter._meta.db_table, tables)
    
  80. 
    
  81.     def test_installed_models(self):
    
  82.         tables = [Article._meta.db_table, Reporter._meta.db_table]
    
  83.         models = connection.introspection.installed_models(tables)
    
  84.         self.assertEqual(models, {Article, Reporter})
    
  85. 
    
  86.     def test_sequence_list(self):
    
  87.         sequences = connection.introspection.sequence_list()
    
  88.         reporter_seqs = [
    
  89.             seq for seq in sequences if seq["table"] == Reporter._meta.db_table
    
  90.         ]
    
  91.         self.assertEqual(
    
  92.             len(reporter_seqs), 1, "Reporter sequence not found in sequence_list()"
    
  93.         )
    
  94.         self.assertEqual(reporter_seqs[0]["column"], "id")
    
  95. 
    
  96.     def test_get_table_description_names(self):
    
  97.         with connection.cursor() as cursor:
    
  98.             desc = connection.introspection.get_table_description(
    
  99.                 cursor, Reporter._meta.db_table
    
  100.             )
    
  101.         self.assertEqual(
    
  102.             [r[0] for r in desc], [f.column for f in Reporter._meta.fields]
    
  103.         )
    
  104. 
    
  105.     def test_get_table_description_types(self):
    
  106.         with connection.cursor() as cursor:
    
  107.             desc = connection.introspection.get_table_description(
    
  108.                 cursor, Reporter._meta.db_table
    
  109.             )
    
  110.         self.assertEqual(
    
  111.             [connection.introspection.get_field_type(r[1], r) for r in desc],
    
  112.             [
    
  113.                 connection.features.introspected_field_types[field]
    
  114.                 for field in (
    
  115.                     "AutoField",
    
  116.                     "CharField",
    
  117.                     "CharField",
    
  118.                     "CharField",
    
  119.                     "BigIntegerField",
    
  120.                     "BinaryField",
    
  121.                     "SmallIntegerField",
    
  122.                     "DurationField",
    
  123.                 )
    
  124.             ],
    
  125.         )
    
  126. 
    
  127.     def test_get_table_description_col_lengths(self):
    
  128.         with connection.cursor() as cursor:
    
  129.             desc = connection.introspection.get_table_description(
    
  130.                 cursor, Reporter._meta.db_table
    
  131.             )
    
  132.         self.assertEqual(
    
  133.             [
    
  134.                 r[3]
    
  135.                 for r in desc
    
  136.                 if connection.introspection.get_field_type(r[1], r) == "CharField"
    
  137.             ],
    
  138.             [30, 30, 254],
    
  139.         )
    
  140. 
    
  141.     def test_get_table_description_nullable(self):
    
  142.         with connection.cursor() as cursor:
    
  143.             desc = connection.introspection.get_table_description(
    
  144.                 cursor, Reporter._meta.db_table
    
  145.             )
    
  146.         nullable_by_backend = connection.features.interprets_empty_strings_as_nulls
    
  147.         self.assertEqual(
    
  148.             [r[6] for r in desc],
    
  149.             [
    
  150.                 False,
    
  151.                 nullable_by_backend,
    
  152.                 nullable_by_backend,
    
  153.                 nullable_by_backend,
    
  154.                 True,
    
  155.                 True,
    
  156.                 False,
    
  157.                 False,
    
  158.             ],
    
  159.         )
    
  160. 
    
  161.     def test_bigautofield(self):
    
  162.         with connection.cursor() as cursor:
    
  163.             desc = connection.introspection.get_table_description(
    
  164.                 cursor, City._meta.db_table
    
  165.             )
    
  166.         self.assertIn(
    
  167.             connection.features.introspected_field_types["BigAutoField"],
    
  168.             [connection.introspection.get_field_type(r[1], r) for r in desc],
    
  169.         )
    
  170. 
    
  171.     def test_smallautofield(self):
    
  172.         with connection.cursor() as cursor:
    
  173.             desc = connection.introspection.get_table_description(
    
  174.                 cursor, Country._meta.db_table
    
  175.             )
    
  176.         self.assertIn(
    
  177.             connection.features.introspected_field_types["SmallAutoField"],
    
  178.             [connection.introspection.get_field_type(r[1], r) for r in desc],
    
  179.         )
    
  180. 
    
  181.     # Regression test for #9991 - 'real' types in postgres
    
  182.     @skipUnlessDBFeature("has_real_datatype")
    
  183.     def test_postgresql_real_type(self):
    
  184.         with connection.cursor() as cursor:
    
  185.             cursor.execute("CREATE TABLE django_ixn_real_test_table (number REAL);")
    
  186.             desc = connection.introspection.get_table_description(
    
  187.                 cursor, "django_ixn_real_test_table"
    
  188.             )
    
  189.             cursor.execute("DROP TABLE django_ixn_real_test_table;")
    
  190.         self.assertEqual(
    
  191.             connection.introspection.get_field_type(desc[0][1], desc[0]), "FloatField"
    
  192.         )
    
  193. 
    
  194.     @skipUnlessDBFeature("can_introspect_foreign_keys")
    
  195.     def test_get_relations(self):
    
  196.         with connection.cursor() as cursor:
    
  197.             relations = connection.introspection.get_relations(
    
  198.                 cursor, Article._meta.db_table
    
  199.             )
    
  200. 
    
  201.         # That's {field_name: (field_name_other_table, other_table)}
    
  202.         expected_relations = {
    
  203.             "reporter_id": ("id", Reporter._meta.db_table),
    
  204.             "response_to_id": ("id", Article._meta.db_table),
    
  205.         }
    
  206.         self.assertEqual(relations, expected_relations)
    
  207. 
    
  208.         # Removing a field shouldn't disturb get_relations (#17785)
    
  209.         body = Article._meta.get_field("body")
    
  210.         with connection.schema_editor() as editor:
    
  211.             editor.remove_field(Article, body)
    
  212.         with connection.cursor() as cursor:
    
  213.             relations = connection.introspection.get_relations(
    
  214.                 cursor, Article._meta.db_table
    
  215.             )
    
  216.         with connection.schema_editor() as editor:
    
  217.             editor.add_field(Article, body)
    
  218.         self.assertEqual(relations, expected_relations)
    
  219. 
    
  220.     def test_get_primary_key_column(self):
    
  221.         with connection.cursor() as cursor:
    
  222.             primary_key_column = connection.introspection.get_primary_key_column(
    
  223.                 cursor, Article._meta.db_table
    
  224.             )
    
  225.             pk_fk_column = connection.introspection.get_primary_key_column(
    
  226.                 cursor, District._meta.db_table
    
  227.             )
    
  228.         self.assertEqual(primary_key_column, "id")
    
  229.         self.assertEqual(pk_fk_column, "city_id")
    
  230. 
    
  231.     def test_get_constraints_index_types(self):
    
  232.         with connection.cursor() as cursor:
    
  233.             constraints = connection.introspection.get_constraints(
    
  234.                 cursor, Article._meta.db_table
    
  235.             )
    
  236.         index = {}
    
  237.         index2 = {}
    
  238.         for val in constraints.values():
    
  239.             if val["columns"] == ["headline", "pub_date"]:
    
  240.                 index = val
    
  241.             if val["columns"] == [
    
  242.                 "headline",
    
  243.                 "response_to_id",
    
  244.                 "pub_date",
    
  245.                 "reporter_id",
    
  246.             ]:
    
  247.                 index2 = val
    
  248.         self.assertEqual(index["type"], Index.suffix)
    
  249.         self.assertEqual(index2["type"], Index.suffix)
    
  250. 
    
  251.     @skipUnlessDBFeature("supports_index_column_ordering")
    
  252.     def test_get_constraints_indexes_orders(self):
    
  253.         """
    
  254.         Indexes have the 'orders' key with a list of 'ASC'/'DESC' values.
    
  255.         """
    
  256.         with connection.cursor() as cursor:
    
  257.             constraints = connection.introspection.get_constraints(
    
  258.                 cursor, Article._meta.db_table
    
  259.             )
    
  260.         indexes_verified = 0
    
  261.         expected_columns = [
    
  262.             ["headline", "pub_date"],
    
  263.             ["headline", "response_to_id", "pub_date", "reporter_id"],
    
  264.         ]
    
  265.         if connection.features.indexes_foreign_keys:
    
  266.             expected_columns += [
    
  267.                 ["reporter_id"],
    
  268.                 ["response_to_id"],
    
  269.             ]
    
  270.         for val in constraints.values():
    
  271.             if val["index"] and not (val["primary_key"] or val["unique"]):
    
  272.                 self.assertIn(val["columns"], expected_columns)
    
  273.                 self.assertEqual(val["orders"], ["ASC"] * len(val["columns"]))
    
  274.                 indexes_verified += 1
    
  275.         self.assertEqual(indexes_verified, len(expected_columns))
    
  276. 
    
  277.     @skipUnlessDBFeature("supports_index_column_ordering", "supports_partial_indexes")
    
  278.     def test_get_constraints_unique_indexes_orders(self):
    
  279.         with connection.cursor() as cursor:
    
  280.             constraints = connection.introspection.get_constraints(
    
  281.                 cursor,
    
  282.                 UniqueConstraintConditionModel._meta.db_table,
    
  283.             )
    
  284.         self.assertIn("cond_name_without_color_uniq", constraints)
    
  285.         constraint = constraints["cond_name_without_color_uniq"]
    
  286.         self.assertIs(constraint["unique"], True)
    
  287.         self.assertEqual(constraint["columns"], ["name"])
    
  288.         self.assertEqual(constraint["orders"], ["ASC"])
    
  289. 
    
  290.     def test_get_constraints(self):
    
  291.         def assertDetails(
    
  292.             details,
    
  293.             cols,
    
  294.             primary_key=False,
    
  295.             unique=False,
    
  296.             index=False,
    
  297.             check=False,
    
  298.             foreign_key=None,
    
  299.         ):
    
  300.             # Different backends have different values for same constraints:
    
  301.             #               PRIMARY KEY     UNIQUE CONSTRAINT    UNIQUE INDEX
    
  302.             # MySQL      pk=1 uniq=1 idx=1  pk=0 uniq=1 idx=1  pk=0 uniq=1 idx=1
    
  303.             # PostgreSQL pk=1 uniq=1 idx=0  pk=0 uniq=1 idx=0  pk=0 uniq=1 idx=1
    
  304.             # SQLite     pk=1 uniq=0 idx=0  pk=0 uniq=1 idx=0  pk=0 uniq=1 idx=1
    
  305.             if details["primary_key"]:
    
  306.                 details["unique"] = True
    
  307.             if details["unique"]:
    
  308.                 details["index"] = False
    
  309.             self.assertEqual(details["columns"], cols)
    
  310.             self.assertEqual(details["primary_key"], primary_key)
    
  311.             self.assertEqual(details["unique"], unique)
    
  312.             self.assertEqual(details["index"], index)
    
  313.             self.assertEqual(details["check"], check)
    
  314.             self.assertEqual(details["foreign_key"], foreign_key)
    
  315. 
    
  316.         # Test custom constraints
    
  317.         custom_constraints = {
    
  318.             "article_email_pub_date_uniq",
    
  319.             "email_pub_date_idx",
    
  320.         }
    
  321.         with connection.cursor() as cursor:
    
  322.             constraints = connection.introspection.get_constraints(
    
  323.                 cursor, Comment._meta.db_table
    
  324.             )
    
  325.             if (
    
  326.                 connection.features.supports_column_check_constraints
    
  327.                 and connection.features.can_introspect_check_constraints
    
  328.             ):
    
  329.                 constraints.update(
    
  330.                     connection.introspection.get_constraints(
    
  331.                         cursor, CheckConstraintModel._meta.db_table
    
  332.                     )
    
  333.                 )
    
  334.                 custom_constraints.add("up_votes_gte_0_check")
    
  335.                 assertDetails(
    
  336.                     constraints["up_votes_gte_0_check"], ["up_votes"], check=True
    
  337.                 )
    
  338.         assertDetails(
    
  339.             constraints["article_email_pub_date_uniq"],
    
  340.             ["article_id", "email", "pub_date"],
    
  341.             unique=True,
    
  342.         )
    
  343.         assertDetails(
    
  344.             constraints["email_pub_date_idx"], ["email", "pub_date"], index=True
    
  345.         )
    
  346.         # Test field constraints
    
  347.         field_constraints = set()
    
  348.         for name, details in constraints.items():
    
  349.             if name in custom_constraints:
    
  350.                 continue
    
  351.             elif details["columns"] == ["up_votes"] and details["check"]:
    
  352.                 assertDetails(details, ["up_votes"], check=True)
    
  353.                 field_constraints.add(name)
    
  354.             elif details["columns"] == ["voting_number"] and details["check"]:
    
  355.                 assertDetails(details, ["voting_number"], check=True)
    
  356.                 field_constraints.add(name)
    
  357.             elif details["columns"] == ["ref"] and details["unique"]:
    
  358.                 assertDetails(details, ["ref"], unique=True)
    
  359.                 field_constraints.add(name)
    
  360.             elif details["columns"] == ["voting_number"] and details["unique"]:
    
  361.                 assertDetails(details, ["voting_number"], unique=True)
    
  362.                 field_constraints.add(name)
    
  363.             elif details["columns"] == ["article_id"] and details["index"]:
    
  364.                 assertDetails(details, ["article_id"], index=True)
    
  365.                 field_constraints.add(name)
    
  366.             elif details["columns"] == ["id"] and details["primary_key"]:
    
  367.                 assertDetails(details, ["id"], primary_key=True, unique=True)
    
  368.                 field_constraints.add(name)
    
  369.             elif details["columns"] == ["article_id"] and details["foreign_key"]:
    
  370.                 assertDetails(
    
  371.                     details, ["article_id"], foreign_key=("introspection_article", "id")
    
  372.                 )
    
  373.                 field_constraints.add(name)
    
  374.             elif details["check"]:
    
  375.                 # Some databases (e.g. Oracle) include additional check
    
  376.                 # constraints.
    
  377.                 field_constraints.add(name)
    
  378.         # All constraints are accounted for.
    
  379.         self.assertEqual(
    
  380.             constraints.keys() ^ (custom_constraints | field_constraints), set()
    
  381.         )