1. from math import ceil
    
  2. from operator import attrgetter
    
  3. 
    
  4. from django.core.exceptions import FieldDoesNotExist
    
  5. from django.db import (
    
  6.     IntegrityError,
    
  7.     NotSupportedError,
    
  8.     OperationalError,
    
  9.     ProgrammingError,
    
  10.     connection,
    
  11. )
    
  12. from django.db.models import FileField, Value
    
  13. from django.db.models.functions import Lower
    
  14. from django.test import (
    
  15.     TestCase,
    
  16.     override_settings,
    
  17.     skipIfDBFeature,
    
  18.     skipUnlessDBFeature,
    
  19. )
    
  20. 
    
  21. from .models import (
    
  22.     BigAutoFieldModel,
    
  23.     Country,
    
  24.     FieldsWithDbColumns,
    
  25.     NoFields,
    
  26.     NullableFields,
    
  27.     Pizzeria,
    
  28.     ProxyCountry,
    
  29.     ProxyMultiCountry,
    
  30.     ProxyMultiProxyCountry,
    
  31.     ProxyProxyCountry,
    
  32.     RelatedModel,
    
  33.     Restaurant,
    
  34.     SmallAutoFieldModel,
    
  35.     State,
    
  36.     TwoFields,
    
  37.     UpsertConflict,
    
  38. )
    
  39. 
    
  40. 
    
  41. class BulkCreateTests(TestCase):
    
  42.     def setUp(self):
    
  43.         self.data = [
    
  44.             Country(name="United States of America", iso_two_letter="US"),
    
  45.             Country(name="The Netherlands", iso_two_letter="NL"),
    
  46.             Country(name="Germany", iso_two_letter="DE"),
    
  47.             Country(name="Czech Republic", iso_two_letter="CZ"),
    
  48.         ]
    
  49. 
    
  50.     def test_simple(self):
    
  51.         created = Country.objects.bulk_create(self.data)
    
  52.         self.assertEqual(created, self.data)
    
  53.         self.assertQuerysetEqual(
    
  54.             Country.objects.order_by("-name"),
    
  55.             [
    
  56.                 "United States of America",
    
  57.                 "The Netherlands",
    
  58.                 "Germany",
    
  59.                 "Czech Republic",
    
  60.             ],
    
  61.             attrgetter("name"),
    
  62.         )
    
  63. 
    
  64.         created = Country.objects.bulk_create([])
    
  65.         self.assertEqual(created, [])
    
  66.         self.assertEqual(Country.objects.count(), 4)
    
  67. 
    
  68.     @skipUnlessDBFeature("has_bulk_insert")
    
  69.     def test_efficiency(self):
    
  70.         with self.assertNumQueries(1):
    
  71.             Country.objects.bulk_create(self.data)
    
  72. 
    
  73.     @skipUnlessDBFeature("has_bulk_insert")
    
  74.     def test_long_non_ascii_text(self):
    
  75.         """
    
  76.         Inserting non-ASCII values with a length in the range 2001 to 4000
    
  77.         characters, i.e. 4002 to 8000 bytes, must be set as a CLOB on Oracle
    
  78.         (#22144).
    
  79.         """
    
  80.         Country.objects.bulk_create([Country(description="Ж" * 3000)])
    
  81.         self.assertEqual(Country.objects.count(), 1)
    
  82. 
    
  83.     @skipUnlessDBFeature("has_bulk_insert")
    
  84.     def test_long_and_short_text(self):
    
  85.         Country.objects.bulk_create(
    
  86.             [
    
  87.                 Country(description="a" * 4001, iso_two_letter="A"),
    
  88.                 Country(description="a", iso_two_letter="B"),
    
  89.                 Country(description="Ж" * 2001, iso_two_letter="C"),
    
  90.                 Country(description="Ж", iso_two_letter="D"),
    
  91.             ]
    
  92.         )
    
  93.         self.assertEqual(Country.objects.count(), 4)
    
  94. 
    
  95.     def test_multi_table_inheritance_unsupported(self):
    
  96.         expected_message = "Can't bulk create a multi-table inherited model"
    
  97.         with self.assertRaisesMessage(ValueError, expected_message):
    
  98.             Pizzeria.objects.bulk_create(
    
  99.                 [
    
  100.                     Pizzeria(name="The Art of Pizza"),
    
  101.                 ]
    
  102.             )
    
  103.         with self.assertRaisesMessage(ValueError, expected_message):
    
  104.             ProxyMultiCountry.objects.bulk_create(
    
  105.                 [
    
  106.                     ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
    
  107.                 ]
    
  108.             )
    
  109.         with self.assertRaisesMessage(ValueError, expected_message):
    
  110.             ProxyMultiProxyCountry.objects.bulk_create(
    
  111.                 [
    
  112.                     ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
    
  113.                 ]
    
  114.             )
    
  115. 
    
  116.     def test_proxy_inheritance_supported(self):
    
  117.         ProxyCountry.objects.bulk_create(
    
  118.             [
    
  119.                 ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
    
  120.                 Country(name="Tortall", iso_two_letter="TA"),
    
  121.             ]
    
  122.         )
    
  123.         self.assertQuerysetEqual(
    
  124.             ProxyCountry.objects.all(),
    
  125.             {"Qwghlm", "Tortall"},
    
  126.             attrgetter("name"),
    
  127.             ordered=False,
    
  128.         )
    
  129. 
    
  130.         ProxyProxyCountry.objects.bulk_create(
    
  131.             [
    
  132.                 ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
    
  133.             ]
    
  134.         )
    
  135.         self.assertQuerysetEqual(
    
  136.             ProxyProxyCountry.objects.all(),
    
  137.             {
    
  138.                 "Qwghlm",
    
  139.                 "Tortall",
    
  140.                 "Netherlands",
    
  141.             },
    
  142.             attrgetter("name"),
    
  143.             ordered=False,
    
  144.         )
    
  145. 
    
  146.     def test_non_auto_increment_pk(self):
    
  147.         State.objects.bulk_create(
    
  148.             [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
    
  149.         )
    
  150.         self.assertQuerysetEqual(
    
  151.             State.objects.order_by("two_letter_code"),
    
  152.             [
    
  153.                 "CA",
    
  154.                 "IL",
    
  155.                 "ME",
    
  156.                 "NY",
    
  157.             ],
    
  158.             attrgetter("two_letter_code"),
    
  159.         )
    
  160. 
    
  161.     @skipUnlessDBFeature("has_bulk_insert")
    
  162.     def test_non_auto_increment_pk_efficiency(self):
    
  163.         with self.assertNumQueries(1):
    
  164.             State.objects.bulk_create(
    
  165.                 [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
    
  166.             )
    
  167.         self.assertQuerysetEqual(
    
  168.             State.objects.order_by("two_letter_code"),
    
  169.             [
    
  170.                 "CA",
    
  171.                 "IL",
    
  172.                 "ME",
    
  173.                 "NY",
    
  174.             ],
    
  175.             attrgetter("two_letter_code"),
    
  176.         )
    
  177. 
    
  178.     @skipIfDBFeature("allows_auto_pk_0")
    
  179.     def test_zero_as_autoval(self):
    
  180.         """
    
  181.         Zero as id for AutoField should raise exception in MySQL, because MySQL
    
  182.         does not allow zero for automatic primary key if the
    
  183.         NO_AUTO_VALUE_ON_ZERO SQL mode is not enabled.
    
  184.         """
    
  185.         valid_country = Country(name="Germany", iso_two_letter="DE")
    
  186.         invalid_country = Country(id=0, name="Poland", iso_two_letter="PL")
    
  187.         msg = "The database backend does not accept 0 as a value for AutoField."
    
  188.         with self.assertRaisesMessage(ValueError, msg):
    
  189.             Country.objects.bulk_create([valid_country, invalid_country])
    
  190. 
    
  191.     def test_batch_same_vals(self):
    
  192.         # SQLite had a problem where all the same-valued models were
    
  193.         # collapsed to one insert.
    
  194.         Restaurant.objects.bulk_create([Restaurant(name="foo") for i in range(0, 2)])
    
  195.         self.assertEqual(Restaurant.objects.count(), 2)
    
  196. 
    
  197.     def test_large_batch(self):
    
  198.         TwoFields.objects.bulk_create(
    
  199.             [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
    
  200.         )
    
  201.         self.assertEqual(TwoFields.objects.count(), 1001)
    
  202.         self.assertEqual(
    
  203.             TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), 101
    
  204.         )
    
  205.         self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101)
    
  206. 
    
  207.     @skipUnlessDBFeature("has_bulk_insert")
    
  208.     def test_large_single_field_batch(self):
    
  209.         # SQLite had a problem with more than 500 UNIONed selects in single
    
  210.         # query.
    
  211.         Restaurant.objects.bulk_create([Restaurant() for i in range(0, 501)])
    
  212. 
    
  213.     @skipUnlessDBFeature("has_bulk_insert")
    
  214.     def test_large_batch_efficiency(self):
    
  215.         with override_settings(DEBUG=True):
    
  216.             connection.queries_log.clear()
    
  217.             TwoFields.objects.bulk_create(
    
  218.                 [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
    
  219.             )
    
  220.             self.assertLess(len(connection.queries), 10)
    
  221. 
    
  222.     def test_large_batch_mixed(self):
    
  223.         """
    
  224.         Test inserting a large batch with objects having primary key set
    
  225.         mixed together with objects without PK set.
    
  226.         """
    
  227.         TwoFields.objects.bulk_create(
    
  228.             [
    
  229.                 TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
    
  230.                 for i in range(100000, 101000)
    
  231.             ]
    
  232.         )
    
  233.         self.assertEqual(TwoFields.objects.count(), 1000)
    
  234.         # We can't assume much about the ID's created, except that the above
    
  235.         # created IDs must exist.
    
  236.         id_range = range(100000, 101000, 2)
    
  237.         self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500)
    
  238.         self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500)
    
  239. 
    
  240.     @skipUnlessDBFeature("has_bulk_insert")
    
  241.     def test_large_batch_mixed_efficiency(self):
    
  242.         """
    
  243.         Test inserting a large batch with objects having primary key set
    
  244.         mixed together with objects without PK set.
    
  245.         """
    
  246.         with override_settings(DEBUG=True):
    
  247.             connection.queries_log.clear()
    
  248.             TwoFields.objects.bulk_create(
    
  249.                 [
    
  250.                     TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
    
  251.                     for i in range(100000, 101000)
    
  252.                 ]
    
  253.             )
    
  254.             self.assertLess(len(connection.queries), 10)
    
  255. 
    
  256.     def test_explicit_batch_size(self):
    
  257.         objs = [TwoFields(f1=i, f2=i) for i in range(0, 4)]
    
  258.         num_objs = len(objs)
    
  259.         TwoFields.objects.bulk_create(objs, batch_size=1)
    
  260.         self.assertEqual(TwoFields.objects.count(), num_objs)
    
  261.         TwoFields.objects.all().delete()
    
  262.         TwoFields.objects.bulk_create(objs, batch_size=2)
    
  263.         self.assertEqual(TwoFields.objects.count(), num_objs)
    
  264.         TwoFields.objects.all().delete()
    
  265.         TwoFields.objects.bulk_create(objs, batch_size=3)
    
  266.         self.assertEqual(TwoFields.objects.count(), num_objs)
    
  267.         TwoFields.objects.all().delete()
    
  268.         TwoFields.objects.bulk_create(objs, batch_size=num_objs)
    
  269.         self.assertEqual(TwoFields.objects.count(), num_objs)
    
  270. 
    
  271.     def test_empty_model(self):
    
  272.         NoFields.objects.bulk_create([NoFields() for i in range(2)])
    
  273.         self.assertEqual(NoFields.objects.count(), 2)
    
  274. 
    
  275.     @skipUnlessDBFeature("has_bulk_insert")
    
  276.     def test_explicit_batch_size_efficiency(self):
    
  277.         objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)]
    
  278.         with self.assertNumQueries(2):
    
  279.             TwoFields.objects.bulk_create(objs, 50)
    
  280.         TwoFields.objects.all().delete()
    
  281.         with self.assertNumQueries(1):
    
  282.             TwoFields.objects.bulk_create(objs, len(objs))
    
  283. 
    
  284.     @skipUnlessDBFeature("has_bulk_insert")
    
  285.     def test_explicit_batch_size_respects_max_batch_size(self):
    
  286.         objs = [Country(name=f"Country {i}") for i in range(1000)]
    
  287.         fields = ["name", "iso_two_letter", "description"]
    
  288.         max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1)
    
  289.         with self.assertNumQueries(ceil(len(objs) / max_batch_size)):
    
  290.             Country.objects.bulk_create(objs, batch_size=max_batch_size + 1)
    
  291. 
    
  292.     @skipUnlessDBFeature("has_bulk_insert")
    
  293.     def test_bulk_insert_expressions(self):
    
  294.         Restaurant.objects.bulk_create(
    
  295.             [
    
  296.                 Restaurant(name="Sam's Shake Shack"),
    
  297.                 Restaurant(name=Lower(Value("Betty's Beetroot Bar"))),
    
  298.             ]
    
  299.         )
    
  300.         bbb = Restaurant.objects.filter(name="betty's beetroot bar")
    
  301.         self.assertEqual(bbb.count(), 1)
    
  302. 
    
  303.     @skipUnlessDBFeature("has_bulk_insert")
    
  304.     def test_bulk_insert_nullable_fields(self):
    
  305.         fk_to_auto_fields = {
    
  306.             "auto_field": NoFields.objects.create(),
    
  307.             "small_auto_field": SmallAutoFieldModel.objects.create(),
    
  308.             "big_auto_field": BigAutoFieldModel.objects.create(),
    
  309.         }
    
  310.         # NULL can be mixed with other values in nullable fields
    
  311.         nullable_fields = [
    
  312.             field for field in NullableFields._meta.get_fields() if field.name != "id"
    
  313.         ]
    
  314.         NullableFields.objects.bulk_create(
    
  315.             [
    
  316.                 NullableFields(**{**fk_to_auto_fields, field.name: None})
    
  317.                 for field in nullable_fields
    
  318.             ]
    
  319.         )
    
  320.         self.assertEqual(NullableFields.objects.count(), len(nullable_fields))
    
  321.         for field in nullable_fields:
    
  322.             with self.subTest(field=field):
    
  323.                 field_value = "" if isinstance(field, FileField) else None
    
  324.                 self.assertEqual(
    
  325.                     NullableFields.objects.filter(**{field.name: field_value}).count(),
    
  326.                     1,
    
  327.                 )
    
  328. 
    
  329.     @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    
  330.     def test_set_pk_and_insert_single_item(self):
    
  331.         with self.assertNumQueries(1):
    
  332.             countries = Country.objects.bulk_create([self.data[0]])
    
  333.         self.assertEqual(len(countries), 1)
    
  334.         self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
    
  335. 
    
  336.     @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    
  337.     def test_set_pk_and_query_efficiency(self):
    
  338.         with self.assertNumQueries(1):
    
  339.             countries = Country.objects.bulk_create(self.data)
    
  340.         self.assertEqual(len(countries), 4)
    
  341.         self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
    
  342.         self.assertEqual(Country.objects.get(pk=countries[1].pk), countries[1])
    
  343.         self.assertEqual(Country.objects.get(pk=countries[2].pk), countries[2])
    
  344.         self.assertEqual(Country.objects.get(pk=countries[3].pk), countries[3])
    
  345. 
    
  346.     @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    
  347.     def test_set_state(self):
    
  348.         country_nl = Country(name="Netherlands", iso_two_letter="NL")
    
  349.         country_be = Country(name="Belgium", iso_two_letter="BE")
    
  350.         Country.objects.bulk_create([country_nl])
    
  351.         country_be.save()
    
  352.         # Objects save via bulk_create() and save() should have equal state.
    
  353.         self.assertEqual(country_nl._state.adding, country_be._state.adding)
    
  354.         self.assertEqual(country_nl._state.db, country_be._state.db)
    
  355. 
    
  356.     def test_set_state_with_pk_specified(self):
    
  357.         state_ca = State(two_letter_code="CA")
    
  358.         state_ny = State(two_letter_code="NY")
    
  359.         State.objects.bulk_create([state_ca])
    
  360.         state_ny.save()
    
  361.         # Objects save via bulk_create() and save() should have equal state.
    
  362.         self.assertEqual(state_ca._state.adding, state_ny._state.adding)
    
  363.         self.assertEqual(state_ca._state.db, state_ny._state.db)
    
  364. 
    
  365.     @skipIfDBFeature("supports_ignore_conflicts")
    
  366.     def test_ignore_conflicts_value_error(self):
    
  367.         message = "This database backend does not support ignoring conflicts."
    
  368.         with self.assertRaisesMessage(NotSupportedError, message):
    
  369.             TwoFields.objects.bulk_create(self.data, ignore_conflicts=True)
    
  370. 
    
  371.     @skipUnlessDBFeature("supports_ignore_conflicts")
    
  372.     def test_ignore_conflicts_ignore(self):
    
  373.         data = [
    
  374.             TwoFields(f1=1, f2=1),
    
  375.             TwoFields(f1=2, f2=2),
    
  376.             TwoFields(f1=3, f2=3),
    
  377.         ]
    
  378.         TwoFields.objects.bulk_create(data)
    
  379.         self.assertEqual(TwoFields.objects.count(), 3)
    
  380.         # With ignore_conflicts=True, conflicts are ignored.
    
  381.         conflicting_objects = [
    
  382.             TwoFields(f1=2, f2=2),
    
  383.             TwoFields(f1=3, f2=3),
    
  384.         ]
    
  385.         TwoFields.objects.bulk_create([conflicting_objects[0]], ignore_conflicts=True)
    
  386.         TwoFields.objects.bulk_create(conflicting_objects, ignore_conflicts=True)
    
  387.         self.assertEqual(TwoFields.objects.count(), 3)
    
  388.         self.assertIsNone(conflicting_objects[0].pk)
    
  389.         self.assertIsNone(conflicting_objects[1].pk)
    
  390.         # New objects are created and conflicts are ignored.
    
  391.         new_object = TwoFields(f1=4, f2=4)
    
  392.         TwoFields.objects.bulk_create(
    
  393.             conflicting_objects + [new_object], ignore_conflicts=True
    
  394.         )
    
  395.         self.assertEqual(TwoFields.objects.count(), 4)
    
  396.         self.assertIsNone(new_object.pk)
    
  397.         # Without ignore_conflicts=True, there's a problem.
    
  398.         with self.assertRaises(IntegrityError):
    
  399.             TwoFields.objects.bulk_create(conflicting_objects)
    
  400. 
    
  401.     def test_nullable_fk_after_parent(self):
    
  402.         parent = NoFields()
    
  403.         child = NullableFields(auto_field=parent, integer_field=88)
    
  404.         parent.save()
    
  405.         NullableFields.objects.bulk_create([child])
    
  406.         child = NullableFields.objects.get(integer_field=88)
    
  407.         self.assertEqual(child.auto_field, parent)
    
  408. 
    
  409.     @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
    
  410.     def test_nullable_fk_after_parent_bulk_create(self):
    
  411.         parent = NoFields()
    
  412.         child = NullableFields(auto_field=parent, integer_field=88)
    
  413.         NoFields.objects.bulk_create([parent])
    
  414.         NullableFields.objects.bulk_create([child])
    
  415.         child = NullableFields.objects.get(integer_field=88)
    
  416.         self.assertEqual(child.auto_field, parent)
    
  417. 
    
  418.     def test_unsaved_parent(self):
    
  419.         parent = NoFields()
    
  420.         msg = (
    
  421.             "bulk_create() prohibited to prevent data loss due to unsaved "
    
  422.             "related object 'auto_field'."
    
  423.         )
    
  424.         with self.assertRaisesMessage(ValueError, msg):
    
  425.             NullableFields.objects.bulk_create([NullableFields(auto_field=parent)])
    
  426. 
    
  427.     def test_invalid_batch_size_exception(self):
    
  428.         msg = "Batch size must be a positive integer."
    
  429.         with self.assertRaisesMessage(ValueError, msg):
    
  430.             Country.objects.bulk_create([], batch_size=-1)
    
  431. 
    
  432.     @skipIfDBFeature("supports_update_conflicts")
    
  433.     def test_update_conflicts_unsupported(self):
    
  434.         msg = "This database backend does not support updating conflicts."
    
  435.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  436.             Country.objects.bulk_create(self.data, update_conflicts=True)
    
  437. 
    
  438.     @skipUnlessDBFeature("supports_ignore_conflicts", "supports_update_conflicts")
    
  439.     def test_ignore_update_conflicts_exclusive(self):
    
  440.         msg = "ignore_conflicts and update_conflicts are mutually exclusive"
    
  441.         with self.assertRaisesMessage(ValueError, msg):
    
  442.             Country.objects.bulk_create(
    
  443.                 self.data,
    
  444.                 ignore_conflicts=True,
    
  445.                 update_conflicts=True,
    
  446.             )
    
  447. 
    
  448.     @skipUnlessDBFeature("supports_update_conflicts")
    
  449.     def test_update_conflicts_no_update_fields(self):
    
  450.         msg = (
    
  451.             "Fields that will be updated when a row insertion fails on "
    
  452.             "conflicts must be provided."
    
  453.         )
    
  454.         with self.assertRaisesMessage(ValueError, msg):
    
  455.             Country.objects.bulk_create(self.data, update_conflicts=True)
    
  456. 
    
  457.     @skipUnlessDBFeature("supports_update_conflicts")
    
  458.     @skipIfDBFeature("supports_update_conflicts_with_target")
    
  459.     def test_update_conflicts_unique_field_unsupported(self):
    
  460.         msg = (
    
  461.             "This database backend does not support updating conflicts with "
    
  462.             "specifying unique fields that can trigger the upsert."
    
  463.         )
    
  464.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  465.             TwoFields.objects.bulk_create(
    
  466.                 [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
    
  467.                 update_conflicts=True,
    
  468.                 update_fields=["f2"],
    
  469.                 unique_fields=["f1"],
    
  470.             )
    
  471. 
    
  472.     @skipUnlessDBFeature("supports_update_conflicts")
    
  473.     def test_update_conflicts_nonexistent_update_fields(self):
    
  474.         unique_fields = None
    
  475.         if connection.features.supports_update_conflicts_with_target:
    
  476.             unique_fields = ["f1"]
    
  477.         msg = "TwoFields has no field named 'nonexistent'"
    
  478.         with self.assertRaisesMessage(FieldDoesNotExist, msg):
    
  479.             TwoFields.objects.bulk_create(
    
  480.                 [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
    
  481.                 update_conflicts=True,
    
  482.                 update_fields=["nonexistent"],
    
  483.                 unique_fields=unique_fields,
    
  484.             )
    
  485. 
    
  486.     @skipUnlessDBFeature(
    
  487.         "supports_update_conflicts",
    
  488.         "supports_update_conflicts_with_target",
    
  489.     )
    
  490.     def test_update_conflicts_unique_fields_required(self):
    
  491.         msg = "Unique fields that can trigger the upsert must be provided."
    
  492.         with self.assertRaisesMessage(ValueError, msg):
    
  493.             TwoFields.objects.bulk_create(
    
  494.                 [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
    
  495.                 update_conflicts=True,
    
  496.                 update_fields=["f1"],
    
  497.             )
    
  498. 
    
  499.     @skipUnlessDBFeature(
    
  500.         "supports_update_conflicts",
    
  501.         "supports_update_conflicts_with_target",
    
  502.     )
    
  503.     def test_update_conflicts_invalid_update_fields(self):
    
  504.         msg = "bulk_create() can only be used with concrete fields in update_fields."
    
  505.         # Reverse one-to-one relationship.
    
  506.         with self.assertRaisesMessage(ValueError, msg):
    
  507.             Country.objects.bulk_create(
    
  508.                 self.data,
    
  509.                 update_conflicts=True,
    
  510.                 update_fields=["relatedmodel"],
    
  511.                 unique_fields=["pk"],
    
  512.             )
    
  513.         # Many-to-many relationship.
    
  514.         with self.assertRaisesMessage(ValueError, msg):
    
  515.             RelatedModel.objects.bulk_create(
    
  516.                 [RelatedModel(country=self.data[0])],
    
  517.                 update_conflicts=True,
    
  518.                 update_fields=["big_auto_fields"],
    
  519.                 unique_fields=["country"],
    
  520.             )
    
  521. 
    
  522.     @skipUnlessDBFeature(
    
  523.         "supports_update_conflicts",
    
  524.         "supports_update_conflicts_with_target",
    
  525.     )
    
  526.     def test_update_conflicts_pk_in_update_fields(self):
    
  527.         msg = "bulk_create() cannot be used with primary keys in update_fields."
    
  528.         with self.assertRaisesMessage(ValueError, msg):
    
  529.             BigAutoFieldModel.objects.bulk_create(
    
  530.                 [BigAutoFieldModel()],
    
  531.                 update_conflicts=True,
    
  532.                 update_fields=["id"],
    
  533.                 unique_fields=["id"],
    
  534.             )
    
  535. 
    
  536.     @skipUnlessDBFeature(
    
  537.         "supports_update_conflicts",
    
  538.         "supports_update_conflicts_with_target",
    
  539.     )
    
  540.     def test_update_conflicts_invalid_unique_fields(self):
    
  541.         msg = "bulk_create() can only be used with concrete fields in unique_fields."
    
  542.         # Reverse one-to-one relationship.
    
  543.         with self.assertRaisesMessage(ValueError, msg):
    
  544.             Country.objects.bulk_create(
    
  545.                 self.data,
    
  546.                 update_conflicts=True,
    
  547.                 update_fields=["name"],
    
  548.                 unique_fields=["relatedmodel"],
    
  549.             )
    
  550.         # Many-to-many relationship.
    
  551.         with self.assertRaisesMessage(ValueError, msg):
    
  552.             RelatedModel.objects.bulk_create(
    
  553.                 [RelatedModel(country=self.data[0])],
    
  554.                 update_conflicts=True,
    
  555.                 update_fields=["name"],
    
  556.                 unique_fields=["big_auto_fields"],
    
  557.             )
    
  558. 
    
  559.     def _test_update_conflicts_two_fields(self, unique_fields):
    
  560.         TwoFields.objects.bulk_create(
    
  561.             [
    
  562.                 TwoFields(f1=1, f2=1, name="a"),
    
  563.                 TwoFields(f1=2, f2=2, name="b"),
    
  564.             ]
    
  565.         )
    
  566.         self.assertEqual(TwoFields.objects.count(), 2)
    
  567. 
    
  568.         conflicting_objects = [
    
  569.             TwoFields(f1=1, f2=1, name="c"),
    
  570.             TwoFields(f1=2, f2=2, name="d"),
    
  571.         ]
    
  572.         TwoFields.objects.bulk_create(
    
  573.             conflicting_objects,
    
  574.             update_conflicts=True,
    
  575.             unique_fields=unique_fields,
    
  576.             update_fields=["name"],
    
  577.         )
    
  578.         self.assertEqual(TwoFields.objects.count(), 2)
    
  579.         self.assertCountEqual(
    
  580.             TwoFields.objects.values("f1", "f2", "name"),
    
  581.             [
    
  582.                 {"f1": 1, "f2": 1, "name": "c"},
    
  583.                 {"f1": 2, "f2": 2, "name": "d"},
    
  584.             ],
    
  585.         )
    
  586. 
    
  587.     @skipUnlessDBFeature(
    
  588.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  589.     )
    
  590.     def test_update_conflicts_two_fields_unique_fields_first(self):
    
  591.         self._test_update_conflicts_two_fields(["f1"])
    
  592. 
    
  593.     @skipUnlessDBFeature(
    
  594.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  595.     )
    
  596.     def test_update_conflicts_two_fields_unique_fields_second(self):
    
  597.         self._test_update_conflicts_two_fields(["f2"])
    
  598. 
    
  599.     @skipUnlessDBFeature(
    
  600.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  601.     )
    
  602.     def test_update_conflicts_unique_fields_pk(self):
    
  603.         TwoFields.objects.bulk_create(
    
  604.             [
    
  605.                 TwoFields(f1=1, f2=1, name="a"),
    
  606.                 TwoFields(f1=2, f2=2, name="b"),
    
  607.             ]
    
  608.         )
    
  609.         self.assertEqual(TwoFields.objects.count(), 2)
    
  610. 
    
  611.         obj1 = TwoFields.objects.get(f1=1)
    
  612.         obj2 = TwoFields.objects.get(f1=2)
    
  613.         conflicting_objects = [
    
  614.             TwoFields(pk=obj1.pk, f1=3, f2=3, name="c"),
    
  615.             TwoFields(pk=obj2.pk, f1=4, f2=4, name="d"),
    
  616.         ]
    
  617.         TwoFields.objects.bulk_create(
    
  618.             conflicting_objects,
    
  619.             update_conflicts=True,
    
  620.             unique_fields=["pk"],
    
  621.             update_fields=["name"],
    
  622.         )
    
  623.         self.assertEqual(TwoFields.objects.count(), 2)
    
  624.         self.assertCountEqual(
    
  625.             TwoFields.objects.values("f1", "f2", "name"),
    
  626.             [
    
  627.                 {"f1": 1, "f2": 1, "name": "c"},
    
  628.                 {"f1": 2, "f2": 2, "name": "d"},
    
  629.             ],
    
  630.         )
    
  631. 
    
  632.     @skipUnlessDBFeature(
    
  633.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  634.     )
    
  635.     def test_update_conflicts_two_fields_unique_fields_both(self):
    
  636.         with self.assertRaises((OperationalError, ProgrammingError)):
    
  637.             self._test_update_conflicts_two_fields(["f1", "f2"])
    
  638. 
    
  639.     @skipUnlessDBFeature("supports_update_conflicts")
    
  640.     @skipIfDBFeature("supports_update_conflicts_with_target")
    
  641.     def test_update_conflicts_two_fields_no_unique_fields(self):
    
  642.         self._test_update_conflicts_two_fields([])
    
  643. 
    
  644.     def _test_update_conflicts_unique_two_fields(self, unique_fields):
    
  645.         Country.objects.bulk_create(self.data)
    
  646.         self.assertEqual(Country.objects.count(), 4)
    
  647. 
    
  648.         new_data = [
    
  649.             # Conflicting countries.
    
  650.             Country(
    
  651.                 name="Germany",
    
  652.                 iso_two_letter="DE",
    
  653.                 description=("Germany is a country in Central Europe."),
    
  654.             ),
    
  655.             Country(
    
  656.                 name="Czech Republic",
    
  657.                 iso_two_letter="CZ",
    
  658.                 description=(
    
  659.                     "The Czech Republic is a landlocked country in Central Europe."
    
  660.                 ),
    
  661.             ),
    
  662.             # New countries.
    
  663.             Country(name="Australia", iso_two_letter="AU"),
    
  664.             Country(
    
  665.                 name="Japan",
    
  666.                 iso_two_letter="JP",
    
  667.                 description=("Japan is an island country in East Asia."),
    
  668.             ),
    
  669.         ]
    
  670.         Country.objects.bulk_create(
    
  671.             new_data,
    
  672.             update_conflicts=True,
    
  673.             update_fields=["description"],
    
  674.             unique_fields=unique_fields,
    
  675.         )
    
  676.         self.assertEqual(Country.objects.count(), 6)
    
  677.         self.assertCountEqual(
    
  678.             Country.objects.values("iso_two_letter", "description"),
    
  679.             [
    
  680.                 {"iso_two_letter": "US", "description": ""},
    
  681.                 {"iso_two_letter": "NL", "description": ""},
    
  682.                 {
    
  683.                     "iso_two_letter": "DE",
    
  684.                     "description": ("Germany is a country in Central Europe."),
    
  685.                 },
    
  686.                 {
    
  687.                     "iso_two_letter": "CZ",
    
  688.                     "description": (
    
  689.                         "The Czech Republic is a landlocked country in Central Europe."
    
  690.                     ),
    
  691.                 },
    
  692.                 {"iso_two_letter": "AU", "description": ""},
    
  693.                 {
    
  694.                     "iso_two_letter": "JP",
    
  695.                     "description": ("Japan is an island country in East Asia."),
    
  696.                 },
    
  697.             ],
    
  698.         )
    
  699. 
    
  700.     @skipUnlessDBFeature(
    
  701.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  702.     )
    
  703.     def test_update_conflicts_unique_two_fields_unique_fields_both(self):
    
  704.         self._test_update_conflicts_unique_two_fields(["iso_two_letter", "name"])
    
  705. 
    
  706.     @skipUnlessDBFeature(
    
  707.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  708.     )
    
  709.     def test_update_conflicts_unique_two_fields_unique_fields_one(self):
    
  710.         with self.assertRaises((OperationalError, ProgrammingError)):
    
  711.             self._test_update_conflicts_unique_two_fields(["iso_two_letter"])
    
  712. 
    
  713.     @skipUnlessDBFeature("supports_update_conflicts")
    
  714.     @skipIfDBFeature("supports_update_conflicts_with_target")
    
  715.     def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self):
    
  716.         self._test_update_conflicts_unique_two_fields([])
    
  717. 
    
  718.     def _test_update_conflicts(self, unique_fields):
    
  719.         UpsertConflict.objects.bulk_create(
    
  720.             [
    
  721.                 UpsertConflict(number=1, rank=1, name="John"),
    
  722.                 UpsertConflict(number=2, rank=2, name="Mary"),
    
  723.                 UpsertConflict(number=3, rank=3, name="Hannah"),
    
  724.             ]
    
  725.         )
    
  726.         self.assertEqual(UpsertConflict.objects.count(), 3)
    
  727. 
    
  728.         conflicting_objects = [
    
  729.             UpsertConflict(number=1, rank=4, name="Steve"),
    
  730.             UpsertConflict(number=2, rank=2, name="Olivia"),
    
  731.             UpsertConflict(number=3, rank=1, name="Hannah"),
    
  732.         ]
    
  733.         UpsertConflict.objects.bulk_create(
    
  734.             conflicting_objects,
    
  735.             update_conflicts=True,
    
  736.             update_fields=["name", "rank"],
    
  737.             unique_fields=unique_fields,
    
  738.         )
    
  739.         self.assertEqual(UpsertConflict.objects.count(), 3)
    
  740.         self.assertCountEqual(
    
  741.             UpsertConflict.objects.values("number", "rank", "name"),
    
  742.             [
    
  743.                 {"number": 1, "rank": 4, "name": "Steve"},
    
  744.                 {"number": 2, "rank": 2, "name": "Olivia"},
    
  745.                 {"number": 3, "rank": 1, "name": "Hannah"},
    
  746.             ],
    
  747.         )
    
  748. 
    
  749.         UpsertConflict.objects.bulk_create(
    
  750.             conflicting_objects + [UpsertConflict(number=4, rank=4, name="Mark")],
    
  751.             update_conflicts=True,
    
  752.             update_fields=["name", "rank"],
    
  753.             unique_fields=unique_fields,
    
  754.         )
    
  755.         self.assertEqual(UpsertConflict.objects.count(), 4)
    
  756.         self.assertCountEqual(
    
  757.             UpsertConflict.objects.values("number", "rank", "name"),
    
  758.             [
    
  759.                 {"number": 1, "rank": 4, "name": "Steve"},
    
  760.                 {"number": 2, "rank": 2, "name": "Olivia"},
    
  761.                 {"number": 3, "rank": 1, "name": "Hannah"},
    
  762.                 {"number": 4, "rank": 4, "name": "Mark"},
    
  763.             ],
    
  764.         )
    
  765. 
    
  766.     @skipUnlessDBFeature(
    
  767.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  768.     )
    
  769.     def test_update_conflicts_unique_fields(self):
    
  770.         self._test_update_conflicts(unique_fields=["number"])
    
  771. 
    
  772.     @skipUnlessDBFeature("supports_update_conflicts")
    
  773.     @skipIfDBFeature("supports_update_conflicts_with_target")
    
  774.     def test_update_conflicts_no_unique_fields(self):
    
  775.         self._test_update_conflicts([])
    
  776. 
    
  777.     @skipUnlessDBFeature(
    
  778.         "supports_update_conflicts", "supports_update_conflicts_with_target"
    
  779.     )
    
  780.     def test_update_conflicts_unique_fields_update_fields_db_column(self):
    
  781.         FieldsWithDbColumns.objects.bulk_create(
    
  782.             [
    
  783.                 FieldsWithDbColumns(rank=1, name="a"),
    
  784.                 FieldsWithDbColumns(rank=2, name="b"),
    
  785.             ]
    
  786.         )
    
  787.         self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
    
  788. 
    
  789.         conflicting_objects = [
    
  790.             FieldsWithDbColumns(rank=1, name="c"),
    
  791.             FieldsWithDbColumns(rank=2, name="d"),
    
  792.         ]
    
  793.         FieldsWithDbColumns.objects.bulk_create(
    
  794.             conflicting_objects,
    
  795.             update_conflicts=True,
    
  796.             unique_fields=["rank"],
    
  797.             update_fields=["name"],
    
  798.         )
    
  799.         self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
    
  800.         self.assertCountEqual(
    
  801.             FieldsWithDbColumns.objects.values("rank", "name"),
    
  802.             [
    
  803.                 {"rank": 1, "name": "c"},
    
  804.                 {"rank": 2, "name": "d"},
    
  805.             ],
    
  806.         )