1. import os
    
  2. import re
    
  3. from io import StringIO
    
  4. from unittest import mock, skipUnless
    
  5. 
    
  6. from django.core.management import call_command
    
  7. from django.db import connection
    
  8. from django.db.backends.base.introspection import TableInfo
    
  9. from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
    
  10. 
    
  11. from .models import PeopleMoreData, test_collation
    
  12. 
    
  13. 
    
  14. def inspectdb_tables_only(table_name):
    
  15.     """
    
  16.     Limit introspection to tables created for models of this app.
    
  17.     Some databases such as Oracle are extremely slow at introspection.
    
  18.     """
    
  19.     return table_name.startswith("inspectdb_")
    
  20. 
    
  21. 
    
  22. def inspectdb_views_only(table_name):
    
  23.     return table_name.startswith("inspectdb_") and table_name.endswith(
    
  24.         ("_materialized", "_view")
    
  25.     )
    
  26. 
    
  27. 
    
  28. def special_table_only(table_name):
    
  29.     return table_name.startswith("inspectdb_special")
    
  30. 
    
  31. 
    
  32. class InspectDBTestCase(TestCase):
    
  33.     unique_re = re.compile(r".*unique_together = \((.+),\).*")
    
  34. 
    
  35.     def test_stealth_table_name_filter_option(self):
    
  36.         out = StringIO()
    
  37.         call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
    
  38.         error_message = (
    
  39.             "inspectdb has examined a table that should have been filtered out."
    
  40.         )
    
  41.         # contrib.contenttypes is one of the apps always installed when running
    
  42.         # the Django test suite, check that one of its tables hasn't been
    
  43.         # inspected
    
  44.         self.assertNotIn(
    
  45.             "class DjangoContentType(models.Model):", out.getvalue(), msg=error_message
    
  46.         )
    
  47. 
    
  48.     def test_table_option(self):
    
  49.         """
    
  50.         inspectdb can inspect a subset of tables by passing the table names as
    
  51.         arguments.
    
  52.         """
    
  53.         out = StringIO()
    
  54.         call_command("inspectdb", "inspectdb_people", stdout=out)
    
  55.         output = out.getvalue()
    
  56.         self.assertIn("class InspectdbPeople(models.Model):", output)
    
  57.         self.assertNotIn("InspectdbPeopledata", output)
    
  58. 
    
  59.     def make_field_type_asserter(self):
    
  60.         """
    
  61.         Call inspectdb and return a function to validate a field type in its
    
  62.         output.
    
  63.         """
    
  64.         out = StringIO()
    
  65.         call_command("inspectdb", "inspectdb_columntypes", stdout=out)
    
  66.         output = out.getvalue()
    
  67. 
    
  68.         def assertFieldType(name, definition):
    
  69.             out_def = re.search(r"^\s*%s = (models.*)$" % name, output, re.MULTILINE)[1]
    
  70.             self.assertEqual(definition, out_def)
    
  71. 
    
  72.         return assertFieldType
    
  73. 
    
  74.     def test_field_types(self):
    
  75.         """Test introspection of various Django field types"""
    
  76.         assertFieldType = self.make_field_type_asserter()
    
  77.         introspected_field_types = connection.features.introspected_field_types
    
  78.         char_field_type = introspected_field_types["CharField"]
    
  79.         # Inspecting Oracle DB doesn't produce correct results (#19884):
    
  80.         # - it reports fields as blank=True when they aren't.
    
  81.         if (
    
  82.             not connection.features.interprets_empty_strings_as_nulls
    
  83.             and char_field_type == "CharField"
    
  84.         ):
    
  85.             assertFieldType("char_field", "models.CharField(max_length=10)")
    
  86.             assertFieldType(
    
  87.                 "null_char_field",
    
  88.                 "models.CharField(max_length=10, blank=True, null=True)",
    
  89.             )
    
  90.             assertFieldType("email_field", "models.CharField(max_length=254)")
    
  91.             assertFieldType("file_field", "models.CharField(max_length=100)")
    
  92.             assertFieldType("file_path_field", "models.CharField(max_length=100)")
    
  93.             assertFieldType("slug_field", "models.CharField(max_length=50)")
    
  94.             assertFieldType("text_field", "models.TextField()")
    
  95.             assertFieldType("url_field", "models.CharField(max_length=200)")
    
  96.         if char_field_type == "TextField":
    
  97.             assertFieldType("char_field", "models.TextField()")
    
  98.             assertFieldType(
    
  99.                 "null_char_field", "models.TextField(blank=True, null=True)"
    
  100.             )
    
  101.             assertFieldType("email_field", "models.TextField()")
    
  102.             assertFieldType("file_field", "models.TextField()")
    
  103.             assertFieldType("file_path_field", "models.TextField()")
    
  104.             assertFieldType("slug_field", "models.TextField()")
    
  105.             assertFieldType("text_field", "models.TextField()")
    
  106.             assertFieldType("url_field", "models.TextField()")
    
  107.         assertFieldType("date_field", "models.DateField()")
    
  108.         assertFieldType("date_time_field", "models.DateTimeField()")
    
  109.         if introspected_field_types["GenericIPAddressField"] == "GenericIPAddressField":
    
  110.             assertFieldType("gen_ip_address_field", "models.GenericIPAddressField()")
    
  111.         elif not connection.features.interprets_empty_strings_as_nulls:
    
  112.             assertFieldType("gen_ip_address_field", "models.CharField(max_length=39)")
    
  113.         assertFieldType(
    
  114.             "time_field", "models.%s()" % introspected_field_types["TimeField"]
    
  115.         )
    
  116.         if connection.features.has_native_uuid_field:
    
  117.             assertFieldType("uuid_field", "models.UUIDField()")
    
  118.         elif not connection.features.interprets_empty_strings_as_nulls:
    
  119.             assertFieldType("uuid_field", "models.CharField(max_length=32)")
    
  120. 
    
  121.     @skipUnlessDBFeature("can_introspect_json_field", "supports_json_field")
    
  122.     def test_json_field(self):
    
  123.         out = StringIO()
    
  124.         call_command("inspectdb", "inspectdb_jsonfieldcolumntype", stdout=out)
    
  125.         output = out.getvalue()
    
  126.         if not connection.features.interprets_empty_strings_as_nulls:
    
  127.             self.assertIn("json_field = models.JSONField()", output)
    
  128.         self.assertIn(
    
  129.             "null_json_field = models.JSONField(blank=True, null=True)", output
    
  130.         )
    
  131. 
    
  132.     @skipUnlessDBFeature("supports_collation_on_charfield")
    
  133.     @skipUnless(test_collation, "Language collations are not supported.")
    
  134.     def test_char_field_db_collation(self):
    
  135.         out = StringIO()
    
  136.         call_command("inspectdb", "inspectdb_charfielddbcollation", stdout=out)
    
  137.         output = out.getvalue()
    
  138.         if not connection.features.interprets_empty_strings_as_nulls:
    
  139.             self.assertIn(
    
  140.                 "char_field = models.CharField(max_length=10, "
    
  141.                 "db_collation='%s')" % test_collation,
    
  142.                 output,
    
  143.             )
    
  144.         else:
    
  145.             self.assertIn(
    
  146.                 "char_field = models.CharField(max_length=10, "
    
  147.                 "db_collation='%s', blank=True, null=True)" % test_collation,
    
  148.                 output,
    
  149.             )
    
  150. 
    
  151.     @skipUnlessDBFeature("supports_collation_on_textfield")
    
  152.     @skipUnless(test_collation, "Language collations are not supported.")
    
  153.     def test_text_field_db_collation(self):
    
  154.         out = StringIO()
    
  155.         call_command("inspectdb", "inspectdb_textfielddbcollation", stdout=out)
    
  156.         output = out.getvalue()
    
  157.         if not connection.features.interprets_empty_strings_as_nulls:
    
  158.             self.assertIn(
    
  159.                 "text_field = models.TextField(db_collation='%s')" % test_collation,
    
  160.                 output,
    
  161.             )
    
  162.         else:
    
  163.             self.assertIn(
    
  164.                 "text_field = models.TextField(db_collation='%s, blank=True, "
    
  165.                 "null=True)" % test_collation,
    
  166.                 output,
    
  167.             )
    
  168. 
    
  169.     def test_number_field_types(self):
    
  170.         """Test introspection of various Django field types"""
    
  171.         assertFieldType = self.make_field_type_asserter()
    
  172.         introspected_field_types = connection.features.introspected_field_types
    
  173. 
    
  174.         auto_field_type = connection.features.introspected_field_types["AutoField"]
    
  175.         if auto_field_type != "AutoField":
    
  176.             assertFieldType(
    
  177.                 "id", "models.%s(primary_key=True)  # AutoField?" % auto_field_type
    
  178.             )
    
  179. 
    
  180.         assertFieldType(
    
  181.             "big_int_field", "models.%s()" % introspected_field_types["BigIntegerField"]
    
  182.         )
    
  183. 
    
  184.         bool_field_type = introspected_field_types["BooleanField"]
    
  185.         assertFieldType("bool_field", "models.{}()".format(bool_field_type))
    
  186.         assertFieldType(
    
  187.             "null_bool_field",
    
  188.             "models.{}(blank=True, null=True)".format(bool_field_type),
    
  189.         )
    
  190. 
    
  191.         if connection.vendor != "sqlite":
    
  192.             assertFieldType(
    
  193.                 "decimal_field", "models.DecimalField(max_digits=6, decimal_places=1)"
    
  194.             )
    
  195.         else:  # Guessed arguments on SQLite, see #5014
    
  196.             assertFieldType(
    
  197.                 "decimal_field",
    
  198.                 "models.DecimalField(max_digits=10, decimal_places=5)  "
    
  199.                 "# max_digits and decimal_places have been guessed, "
    
  200.                 "as this database handles decimal fields as float",
    
  201.             )
    
  202. 
    
  203.         assertFieldType("float_field", "models.FloatField()")
    
  204.         assertFieldType(
    
  205.             "int_field", "models.%s()" % introspected_field_types["IntegerField"]
    
  206.         )
    
  207.         assertFieldType(
    
  208.             "pos_int_field",
    
  209.             "models.%s()" % introspected_field_types["PositiveIntegerField"],
    
  210.         )
    
  211.         assertFieldType(
    
  212.             "pos_big_int_field",
    
  213.             "models.%s()" % introspected_field_types["PositiveBigIntegerField"],
    
  214.         )
    
  215.         assertFieldType(
    
  216.             "pos_small_int_field",
    
  217.             "models.%s()" % introspected_field_types["PositiveSmallIntegerField"],
    
  218.         )
    
  219.         assertFieldType(
    
  220.             "small_int_field",
    
  221.             "models.%s()" % introspected_field_types["SmallIntegerField"],
    
  222.         )
    
  223. 
    
  224.     @skipUnlessDBFeature("can_introspect_foreign_keys")
    
  225.     def test_attribute_name_not_python_keyword(self):
    
  226.         out = StringIO()
    
  227.         call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
    
  228.         output = out.getvalue()
    
  229.         error_message = (
    
  230.             "inspectdb generated an attribute name which is a Python keyword"
    
  231.         )
    
  232.         # Recursive foreign keys should be set to 'self'
    
  233.         self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
    
  234.         self.assertNotIn(
    
  235.             "from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
    
  236.             output,
    
  237.             msg=error_message,
    
  238.         )
    
  239.         # As InspectdbPeople model is defined after InspectdbMessage, it should
    
  240.         # be quoted.
    
  241.         self.assertIn(
    
  242.             "from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
    
  243.             "db_column='from_id')",
    
  244.             output,
    
  245.         )
    
  246.         self.assertIn(
    
  247.             "people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, "
    
  248.             "primary_key=True)",
    
  249.             output,
    
  250.         )
    
  251.         self.assertIn(
    
  252.             "people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)",
    
  253.             output,
    
  254.         )
    
  255. 
    
  256.     @skipUnlessDBFeature("can_introspect_foreign_keys")
    
  257.     def test_foreign_key_to_field(self):
    
  258.         out = StringIO()
    
  259.         call_command("inspectdb", "inspectdb_foreignkeytofield", stdout=out)
    
  260.         self.assertIn(
    
  261.             "to_field_fk = models.ForeignKey('InspectdbPeoplemoredata', "
    
  262.             "models.DO_NOTHING, to_field='people_unique_id')",
    
  263.             out.getvalue(),
    
  264.         )
    
  265. 
    
  266.     def test_digits_column_name_introspection(self):
    
  267.         """Introspection of column names consist/start with digits (#16536/#17676)"""
    
  268.         char_field_type = connection.features.introspected_field_types["CharField"]
    
  269.         out = StringIO()
    
  270.         call_command("inspectdb", "inspectdb_digitsincolumnname", stdout=out)
    
  271.         output = out.getvalue()
    
  272.         error_message = "inspectdb generated a model field name which is a number"
    
  273.         self.assertNotIn(
    
  274.             "    123 = models.%s" % char_field_type, output, msg=error_message
    
  275.         )
    
  276.         self.assertIn("number_123 = models.%s" % char_field_type, output)
    
  277. 
    
  278.         error_message = (
    
  279.             "inspectdb generated a model field name which starts with a digit"
    
  280.         )
    
  281.         self.assertNotIn(
    
  282.             "    4extra = models.%s" % char_field_type, output, msg=error_message
    
  283.         )
    
  284.         self.assertIn("number_4extra = models.%s" % char_field_type, output)
    
  285. 
    
  286.         self.assertNotIn(
    
  287.             "    45extra = models.%s" % char_field_type, output, msg=error_message
    
  288.         )
    
  289.         self.assertIn("number_45extra = models.%s" % char_field_type, output)
    
  290. 
    
  291.     def test_special_column_name_introspection(self):
    
  292.         """
    
  293.         Introspection of column names containing special characters,
    
  294.         unsuitable for Python identifiers
    
  295.         """
    
  296.         out = StringIO()
    
  297.         call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
    
  298.         output = out.getvalue()
    
  299.         base_name = connection.introspection.identifier_converter("Field")
    
  300.         integer_field_type = connection.features.introspected_field_types[
    
  301.             "IntegerField"
    
  302.         ]
    
  303.         self.assertIn("field = models.%s()" % integer_field_type, output)
    
  304.         self.assertIn(
    
  305.             "field_field = models.%s(db_column='%s_')"
    
  306.             % (integer_field_type, base_name),
    
  307.             output,
    
  308.         )
    
  309.         self.assertIn(
    
  310.             "field_field_0 = models.%s(db_column='%s__')"
    
  311.             % (integer_field_type, base_name),
    
  312.             output,
    
  313.         )
    
  314.         self.assertIn(
    
  315.             "field_field_1 = models.%s(db_column='__field')" % integer_field_type,
    
  316.             output,
    
  317.         )
    
  318.         self.assertIn(
    
  319.             "prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output
    
  320.         )
    
  321.         self.assertIn("tamaño = models.%s()" % integer_field_type, output)
    
  322. 
    
  323.     def test_table_name_introspection(self):
    
  324.         """
    
  325.         Introspection of table names containing special characters,
    
  326.         unsuitable for Python identifiers
    
  327.         """
    
  328.         out = StringIO()
    
  329.         call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
    
  330.         output = out.getvalue()
    
  331.         self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
    
  332. 
    
  333.     @skipUnlessDBFeature("supports_expression_indexes")
    
  334.     def test_table_with_func_unique_constraint(self):
    
  335.         out = StringIO()
    
  336.         call_command("inspectdb", "inspectdb_funcuniqueconstraint", stdout=out)
    
  337.         output = out.getvalue()
    
  338.         self.assertIn("class InspectdbFuncuniqueconstraint(models.Model):", output)
    
  339. 
    
  340.     def test_managed_models(self):
    
  341.         """
    
  342.         By default the command generates models with `Meta.managed = False`.
    
  343.         """
    
  344.         out = StringIO()
    
  345.         call_command("inspectdb", "inspectdb_columntypes", stdout=out)
    
  346.         output = out.getvalue()
    
  347.         self.longMessage = False
    
  348.         self.assertIn(
    
  349.             "        managed = False",
    
  350.             output,
    
  351.             msg="inspectdb should generate unmanaged models.",
    
  352.         )
    
  353. 
    
  354.     def test_unique_together_meta(self):
    
  355.         out = StringIO()
    
  356.         call_command("inspectdb", "inspectdb_uniquetogether", stdout=out)
    
  357.         output = out.getvalue()
    
  358.         self.assertIn("    unique_together = (('", output)
    
  359.         unique_together_match = self.unique_re.findall(output)
    
  360.         # There should be one unique_together tuple.
    
  361.         self.assertEqual(len(unique_together_match), 1)
    
  362.         fields = unique_together_match[0]
    
  363.         # Fields with db_column = field name.
    
  364.         self.assertIn("('field1', 'field2')", fields)
    
  365.         # Fields from columns whose names are Python keywords.
    
  366.         self.assertIn("('field1', 'field2')", fields)
    
  367.         # Fields whose names normalize to the same Python field name and hence
    
  368.         # are given an integer suffix.
    
  369.         self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
    
  370. 
    
  371.     @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
    
  372.     def test_unsupported_unique_together(self):
    
  373.         """Unsupported index types (COALESCE here) are skipped."""
    
  374.         with connection.cursor() as c:
    
  375.             c.execute(
    
  376.                 "CREATE UNIQUE INDEX Findex ON %s "
    
  377.                 "(id, people_unique_id, COALESCE(message_id, -1))"
    
  378.                 % PeopleMoreData._meta.db_table
    
  379.             )
    
  380.         try:
    
  381.             out = StringIO()
    
  382.             call_command(
    
  383.                 "inspectdb",
    
  384.                 table_name_filter=lambda tn: tn.startswith(
    
  385.                     PeopleMoreData._meta.db_table
    
  386.                 ),
    
  387.                 stdout=out,
    
  388.             )
    
  389.             output = out.getvalue()
    
  390.             self.assertIn("# A unique constraint could not be introspected.", output)
    
  391.             self.assertEqual(
    
  392.                 self.unique_re.findall(output), ["('id', 'people_unique')"]
    
  393.             )
    
  394.         finally:
    
  395.             with connection.cursor() as c:
    
  396.                 c.execute("DROP INDEX Findex")
    
  397. 
    
  398.     @skipUnless(
    
  399.         connection.vendor == "sqlite",
    
  400.         "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test",
    
  401.     )
    
  402.     def test_custom_fields(self):
    
  403.         """
    
  404.         Introspection of columns with a custom field (#21090)
    
  405.         """
    
  406.         out = StringIO()
    
  407.         with mock.patch(
    
  408.             "django.db.connection.introspection.data_types_reverse."
    
  409.             "base_data_types_reverse",
    
  410.             {
    
  411.                 "text": "myfields.TextField",
    
  412.                 "bigint": "BigIntegerField",
    
  413.             },
    
  414.         ):
    
  415.             call_command("inspectdb", "inspectdb_columntypes", stdout=out)
    
  416.             output = out.getvalue()
    
  417.             self.assertIn("text_field = myfields.TextField()", output)
    
  418.             self.assertIn("big_int_field = models.BigIntegerField()", output)
    
  419. 
    
  420.     def test_introspection_errors(self):
    
  421.         """
    
  422.         Introspection errors should not crash the command, and the error should
    
  423.         be visible in the output.
    
  424.         """
    
  425.         out = StringIO()
    
  426.         with mock.patch(
    
  427.             "django.db.connection.introspection.get_table_list",
    
  428.             return_value=[TableInfo(name="nonexistent", type="t")],
    
  429.         ):
    
  430.             call_command("inspectdb", stdout=out)
    
  431.         output = out.getvalue()
    
  432.         self.assertIn("# Unable to inspect table 'nonexistent'", output)
    
  433.         # The error message depends on the backend
    
  434.         self.assertIn("# The error was:", output)
    
  435. 
    
  436. 
    
  437. class InspectDBTransactionalTests(TransactionTestCase):
    
  438.     available_apps = ["inspectdb"]
    
  439. 
    
  440.     def test_include_views(self):
    
  441.         """inspectdb --include-views creates models for database views."""
    
  442.         with connection.cursor() as cursor:
    
  443.             cursor.execute(
    
  444.                 "CREATE VIEW inspectdb_people_view AS "
    
  445.                 "SELECT id, name FROM inspectdb_people"
    
  446.             )
    
  447.         out = StringIO()
    
  448.         view_model = "class InspectdbPeopleView(models.Model):"
    
  449.         view_managed = "managed = False  # Created from a view."
    
  450.         try:
    
  451.             call_command(
    
  452.                 "inspectdb",
    
  453.                 table_name_filter=inspectdb_views_only,
    
  454.                 stdout=out,
    
  455.             )
    
  456.             no_views_output = out.getvalue()
    
  457.             self.assertNotIn(view_model, no_views_output)
    
  458.             self.assertNotIn(view_managed, no_views_output)
    
  459.             call_command(
    
  460.                 "inspectdb",
    
  461.                 table_name_filter=inspectdb_views_only,
    
  462.                 include_views=True,
    
  463.                 stdout=out,
    
  464.             )
    
  465.             with_views_output = out.getvalue()
    
  466.             self.assertIn(view_model, with_views_output)
    
  467.             self.assertIn(view_managed, with_views_output)
    
  468.         finally:
    
  469.             with connection.cursor() as cursor:
    
  470.                 cursor.execute("DROP VIEW inspectdb_people_view")
    
  471. 
    
  472.     @skipUnlessDBFeature("can_introspect_materialized_views")
    
  473.     def test_include_materialized_views(self):
    
  474.         """inspectdb --include-views creates models for materialized views."""
    
  475.         with connection.cursor() as cursor:
    
  476.             cursor.execute(
    
  477.                 "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
    
  478.                 "SELECT id, name FROM inspectdb_people"
    
  479.             )
    
  480.         out = StringIO()
    
  481.         view_model = "class InspectdbPeopleMaterialized(models.Model):"
    
  482.         view_managed = "managed = False  # Created from a view."
    
  483.         try:
    
  484.             call_command(
    
  485.                 "inspectdb",
    
  486.                 table_name_filter=inspectdb_views_only,
    
  487.                 stdout=out,
    
  488.             )
    
  489.             no_views_output = out.getvalue()
    
  490.             self.assertNotIn(view_model, no_views_output)
    
  491.             self.assertNotIn(view_managed, no_views_output)
    
  492.             call_command(
    
  493.                 "inspectdb",
    
  494.                 table_name_filter=inspectdb_views_only,
    
  495.                 include_views=True,
    
  496.                 stdout=out,
    
  497.             )
    
  498.             with_views_output = out.getvalue()
    
  499.             self.assertIn(view_model, with_views_output)
    
  500.             self.assertIn(view_managed, with_views_output)
    
  501.         finally:
    
  502.             with connection.cursor() as cursor:
    
  503.                 cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
    
  504. 
    
  505.     @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
    
  506.     def test_include_partitions(self):
    
  507.         """inspectdb --include-partitions creates models for partitions."""
    
  508.         with connection.cursor() as cursor:
    
  509.             cursor.execute(
    
  510.                 """\
    
  511.                 CREATE TABLE inspectdb_partition_parent (name text not null)
    
  512.                 PARTITION BY LIST (left(upper(name), 1))
    
  513.             """
    
  514.             )
    
  515.             cursor.execute(
    
  516.                 """\
    
  517.                 CREATE TABLE inspectdb_partition_child
    
  518.                 PARTITION OF inspectdb_partition_parent
    
  519.                 FOR VALUES IN ('A', 'B', 'C')
    
  520.             """
    
  521.             )
    
  522.         out = StringIO()
    
  523.         partition_model_parent = "class InspectdbPartitionParent(models.Model):"
    
  524.         partition_model_child = "class InspectdbPartitionChild(models.Model):"
    
  525.         partition_managed = "managed = False  # Created from a partition."
    
  526.         try:
    
  527.             call_command(
    
  528.                 "inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
    
  529.             )
    
  530.             no_partitions_output = out.getvalue()
    
  531.             self.assertIn(partition_model_parent, no_partitions_output)
    
  532.             self.assertNotIn(partition_model_child, no_partitions_output)
    
  533.             self.assertNotIn(partition_managed, no_partitions_output)
    
  534.             call_command(
    
  535.                 "inspectdb",
    
  536.                 table_name_filter=inspectdb_tables_only,
    
  537.                 include_partitions=True,
    
  538.                 stdout=out,
    
  539.             )
    
  540.             with_partitions_output = out.getvalue()
    
  541.             self.assertIn(partition_model_parent, with_partitions_output)
    
  542.             self.assertIn(partition_model_child, with_partitions_output)
    
  543.             self.assertIn(partition_managed, with_partitions_output)
    
  544.         finally:
    
  545.             with connection.cursor() as cursor:
    
  546.                 cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
    
  547.                 cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
    
  548. 
    
  549.     @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
    
  550.     def test_foreign_data_wrapper(self):
    
  551.         with connection.cursor() as cursor:
    
  552.             cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
    
  553.             cursor.execute(
    
  554.                 "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
    
  555.             )
    
  556.             cursor.execute(
    
  557.                 """\
    
  558.                 CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
    
  559.                     petal_length real,
    
  560.                     petal_width real,
    
  561.                     sepal_length real,
    
  562.                     sepal_width real
    
  563.                 ) SERVER inspectdb_server OPTIONS (
    
  564.                     filename %s
    
  565.                 )
    
  566.             """,
    
  567.                 [os.devnull],
    
  568.             )
    
  569.         out = StringIO()
    
  570.         foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
    
  571.         foreign_table_managed = "managed = False"
    
  572.         try:
    
  573.             call_command(
    
  574.                 "inspectdb",
    
  575.                 table_name_filter=inspectdb_tables_only,
    
  576.                 stdout=out,
    
  577.             )
    
  578.             output = out.getvalue()
    
  579.             self.assertIn(foreign_table_model, output)
    
  580.             self.assertIn(foreign_table_managed, output)
    
  581.         finally:
    
  582.             with connection.cursor() as cursor:
    
  583.                 cursor.execute(
    
  584.                     "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
    
  585.                 )
    
  586.                 cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
    
  587.                 cursor.execute("DROP EXTENSION IF EXISTS file_fdw")