1. import time
    
  2. import traceback
    
  3. from datetime import date, datetime, timedelta
    
  4. from threading import Thread
    
  5. 
    
  6. from django.core.exceptions import FieldError
    
  7. from django.db import DatabaseError, IntegrityError, connection
    
  8. from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
    
  9. from django.utils.functional import lazy
    
  10. 
    
  11. from .models import (
    
  12.     Author,
    
  13.     Book,
    
  14.     DefaultPerson,
    
  15.     ManualPrimaryKeyTest,
    
  16.     Person,
    
  17.     Profile,
    
  18.     Publisher,
    
  19.     Tag,
    
  20.     Thing,
    
  21. )
    
  22. 
    
  23. 
    
  24. class GetOrCreateTests(TestCase):
    
  25.     @classmethod
    
  26.     def setUpTestData(cls):
    
  27.         Person.objects.create(
    
  28.             first_name="John", last_name="Lennon", birthday=date(1940, 10, 9)
    
  29.         )
    
  30. 
    
  31.     def test_get_or_create_method_with_get(self):
    
  32.         created = Person.objects.get_or_create(
    
  33.             first_name="John",
    
  34.             last_name="Lennon",
    
  35.             defaults={"birthday": date(1940, 10, 9)},
    
  36.         )[1]
    
  37.         self.assertFalse(created)
    
  38.         self.assertEqual(Person.objects.count(), 1)
    
  39. 
    
  40.     def test_get_or_create_method_with_create(self):
    
  41.         created = Person.objects.get_or_create(
    
  42.             first_name="George",
    
  43.             last_name="Harrison",
    
  44.             defaults={"birthday": date(1943, 2, 25)},
    
  45.         )[1]
    
  46.         self.assertTrue(created)
    
  47.         self.assertEqual(Person.objects.count(), 2)
    
  48. 
    
  49.     def test_get_or_create_redundant_instance(self):
    
  50.         """
    
  51.         If we execute the exact same statement twice, the second time,
    
  52.         it won't create a Person.
    
  53.         """
    
  54.         Person.objects.get_or_create(
    
  55.             first_name="George",
    
  56.             last_name="Harrison",
    
  57.             defaults={"birthday": date(1943, 2, 25)},
    
  58.         )
    
  59.         created = Person.objects.get_or_create(
    
  60.             first_name="George",
    
  61.             last_name="Harrison",
    
  62.             defaults={"birthday": date(1943, 2, 25)},
    
  63.         )[1]
    
  64. 
    
  65.         self.assertFalse(created)
    
  66.         self.assertEqual(Person.objects.count(), 2)
    
  67. 
    
  68.     def test_get_or_create_invalid_params(self):
    
  69.         """
    
  70.         If you don't specify a value or default value for all required
    
  71.         fields, you will get an error.
    
  72.         """
    
  73.         with self.assertRaises(IntegrityError):
    
  74.             Person.objects.get_or_create(first_name="Tom", last_name="Smith")
    
  75. 
    
  76.     def test_get_or_create_with_pk_property(self):
    
  77.         """
    
  78.         Using the pk property of a model is allowed.
    
  79.         """
    
  80.         Thing.objects.get_or_create(pk=1)
    
  81. 
    
  82.     def test_get_or_create_with_model_property_defaults(self):
    
  83.         """Using a property with a setter implemented is allowed."""
    
  84.         t, _ = Thing.objects.get_or_create(
    
  85.             defaults={"capitalized_name_property": "annie"}, pk=1
    
  86.         )
    
  87.         self.assertEqual(t.name, "Annie")
    
  88. 
    
  89.     def test_get_or_create_on_related_manager(self):
    
  90.         p = Publisher.objects.create(name="Acme Publishing")
    
  91.         # Create a book through the publisher.
    
  92.         book, created = p.books.get_or_create(name="The Book of Ed & Fred")
    
  93.         self.assertTrue(created)
    
  94.         # The publisher should have one book.
    
  95.         self.assertEqual(p.books.count(), 1)
    
  96. 
    
  97.         # Try get_or_create again, this time nothing should be created.
    
  98.         book, created = p.books.get_or_create(name="The Book of Ed & Fred")
    
  99.         self.assertFalse(created)
    
  100.         # And the publisher should still have one book.
    
  101.         self.assertEqual(p.books.count(), 1)
    
  102. 
    
  103.         # Add an author to the book.
    
  104.         ed, created = book.authors.get_or_create(name="Ed")
    
  105.         self.assertTrue(created)
    
  106.         # The book should have one author.
    
  107.         self.assertEqual(book.authors.count(), 1)
    
  108. 
    
  109.         # Try get_or_create again, this time nothing should be created.
    
  110.         ed, created = book.authors.get_or_create(name="Ed")
    
  111.         self.assertFalse(created)
    
  112.         # And the book should still have one author.
    
  113.         self.assertEqual(book.authors.count(), 1)
    
  114. 
    
  115.         # Add a second author to the book.
    
  116.         fred, created = book.authors.get_or_create(name="Fred")
    
  117.         self.assertTrue(created)
    
  118. 
    
  119.         # The book should have two authors now.
    
  120.         self.assertEqual(book.authors.count(), 2)
    
  121. 
    
  122.         # Create an Author not tied to any books.
    
  123.         Author.objects.create(name="Ted")
    
  124. 
    
  125.         # There should be three Authors in total. The book object should have two.
    
  126.         self.assertEqual(Author.objects.count(), 3)
    
  127.         self.assertEqual(book.authors.count(), 2)
    
  128. 
    
  129.         # Try creating a book through an author.
    
  130.         _, created = ed.books.get_or_create(name="Ed's Recipes", publisher=p)
    
  131.         self.assertTrue(created)
    
  132. 
    
  133.         # Now Ed has two Books, Fred just one.
    
  134.         self.assertEqual(ed.books.count(), 2)
    
  135.         self.assertEqual(fred.books.count(), 1)
    
  136. 
    
  137.         # Use the publisher's primary key value instead of a model instance.
    
  138.         _, created = ed.books.get_or_create(
    
  139.             name="The Great Book of Ed", publisher_id=p.id
    
  140.         )
    
  141.         self.assertTrue(created)
    
  142. 
    
  143.         # Try get_or_create again, this time nothing should be created.
    
  144.         _, created = ed.books.get_or_create(
    
  145.             name="The Great Book of Ed", publisher_id=p.id
    
  146.         )
    
  147.         self.assertFalse(created)
    
  148. 
    
  149.         # The publisher should have three books.
    
  150.         self.assertEqual(p.books.count(), 3)
    
  151. 
    
  152.     def test_defaults_exact(self):
    
  153.         """
    
  154.         If you have a field named defaults and want to use it as an exact
    
  155.         lookup, you need to use 'defaults__exact'.
    
  156.         """
    
  157.         obj, created = Person.objects.get_or_create(
    
  158.             first_name="George",
    
  159.             last_name="Harrison",
    
  160.             defaults__exact="testing",
    
  161.             defaults={
    
  162.                 "birthday": date(1943, 2, 25),
    
  163.                 "defaults": "testing",
    
  164.             },
    
  165.         )
    
  166.         self.assertTrue(created)
    
  167.         self.assertEqual(obj.defaults, "testing")
    
  168.         obj2, created = Person.objects.get_or_create(
    
  169.             first_name="George",
    
  170.             last_name="Harrison",
    
  171.             defaults__exact="testing",
    
  172.             defaults={
    
  173.                 "birthday": date(1943, 2, 25),
    
  174.                 "defaults": "testing",
    
  175.             },
    
  176.         )
    
  177.         self.assertFalse(created)
    
  178.         self.assertEqual(obj, obj2)
    
  179. 
    
  180.     def test_callable_defaults(self):
    
  181.         """
    
  182.         Callables in `defaults` are evaluated if the instance is created.
    
  183.         """
    
  184.         obj, created = Person.objects.get_or_create(
    
  185.             first_name="George",
    
  186.             defaults={"last_name": "Harrison", "birthday": lambda: date(1943, 2, 25)},
    
  187.         )
    
  188.         self.assertTrue(created)
    
  189.         self.assertEqual(date(1943, 2, 25), obj.birthday)
    
  190. 
    
  191.     def test_callable_defaults_not_called(self):
    
  192.         def raise_exception():
    
  193.             raise AssertionError
    
  194. 
    
  195.         obj, created = Person.objects.get_or_create(
    
  196.             first_name="John",
    
  197.             last_name="Lennon",
    
  198.             defaults={"birthday": lambda: raise_exception()},
    
  199.         )
    
  200. 
    
  201.     def test_defaults_not_evaluated_unless_needed(self):
    
  202.         """`defaults` aren't evaluated if the instance isn't created."""
    
  203. 
    
  204.         def raise_exception():
    
  205.             raise AssertionError
    
  206. 
    
  207.         obj, created = Person.objects.get_or_create(
    
  208.             first_name="John",
    
  209.             defaults=lazy(raise_exception, object)(),
    
  210.         )
    
  211.         self.assertFalse(created)
    
  212. 
    
  213. 
    
  214. class GetOrCreateTestsWithManualPKs(TestCase):
    
  215.     @classmethod
    
  216.     def setUpTestData(cls):
    
  217.         ManualPrimaryKeyTest.objects.create(id=1, data="Original")
    
  218. 
    
  219.     def test_create_with_duplicate_primary_key(self):
    
  220.         """
    
  221.         If you specify an existing primary key, but different other fields,
    
  222.         then you will get an error and data will not be updated.
    
  223.         """
    
  224.         with self.assertRaises(IntegrityError):
    
  225.             ManualPrimaryKeyTest.objects.get_or_create(id=1, data="Different")
    
  226.         self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, "Original")
    
  227. 
    
  228.     def test_get_or_create_raises_IntegrityError_plus_traceback(self):
    
  229.         """
    
  230.         get_or_create should raise IntegrityErrors with the full traceback.
    
  231.         This is tested by checking that a known method call is in the traceback.
    
  232.         We cannot use assertRaises here because we need to inspect
    
  233.         the actual traceback. Refs #16340.
    
  234.         """
    
  235.         try:
    
  236.             ManualPrimaryKeyTest.objects.get_or_create(id=1, data="Different")
    
  237.         except IntegrityError:
    
  238.             formatted_traceback = traceback.format_exc()
    
  239.             self.assertIn("obj.save", formatted_traceback)
    
  240. 
    
  241.     def test_savepoint_rollback(self):
    
  242.         """
    
  243.         The database connection is still usable after a DatabaseError in
    
  244.         get_or_create() (#20463).
    
  245.         """
    
  246.         Tag.objects.create(text="foo")
    
  247.         with self.assertRaises(DatabaseError):
    
  248.             # pk 123456789 doesn't exist, so the tag object will be created.
    
  249.             # Saving triggers a unique constraint violation on 'text'.
    
  250.             Tag.objects.get_or_create(pk=123456789, defaults={"text": "foo"})
    
  251.         # Tag objects can be created after the error.
    
  252.         Tag.objects.create(text="bar")
    
  253. 
    
  254.     def test_get_or_create_empty(self):
    
  255.         """
    
  256.         If all the attributes on a model have defaults, get_or_create() doesn't
    
  257.         require any arguments.
    
  258.         """
    
  259.         DefaultPerson.objects.get_or_create()
    
  260. 
    
  261. 
    
  262. class GetOrCreateTransactionTests(TransactionTestCase):
    
  263.     available_apps = ["get_or_create"]
    
  264. 
    
  265.     def test_get_or_create_integrityerror(self):
    
  266.         """
    
  267.         Regression test for #15117. Requires a TransactionTestCase on
    
  268.         databases that delay integrity checks until the end of transactions,
    
  269.         otherwise the exception is never raised.
    
  270.         """
    
  271.         try:
    
  272.             Profile.objects.get_or_create(person=Person(id=1))
    
  273.         except IntegrityError:
    
  274.             pass
    
  275.         else:
    
  276.             self.skipTest("This backend does not support integrity checks.")
    
  277. 
    
  278. 
    
  279. class GetOrCreateThroughManyToMany(TestCase):
    
  280.     def test_get_get_or_create(self):
    
  281.         tag = Tag.objects.create(text="foo")
    
  282.         a_thing = Thing.objects.create(name="a")
    
  283.         a_thing.tags.add(tag)
    
  284.         obj, created = a_thing.tags.get_or_create(text="foo")
    
  285. 
    
  286.         self.assertFalse(created)
    
  287.         self.assertEqual(obj.pk, tag.pk)
    
  288. 
    
  289.     def test_create_get_or_create(self):
    
  290.         a_thing = Thing.objects.create(name="a")
    
  291.         obj, created = a_thing.tags.get_or_create(text="foo")
    
  292. 
    
  293.         self.assertTrue(created)
    
  294.         self.assertEqual(obj.text, "foo")
    
  295.         self.assertIn(obj, a_thing.tags.all())
    
  296. 
    
  297.     def test_something(self):
    
  298.         Tag.objects.create(text="foo")
    
  299.         a_thing = Thing.objects.create(name="a")
    
  300.         with self.assertRaises(IntegrityError):
    
  301.             a_thing.tags.get_or_create(text="foo")
    
  302. 
    
  303. 
    
  304. class UpdateOrCreateTests(TestCase):
    
  305.     def test_update(self):
    
  306.         Person.objects.create(
    
  307.             first_name="John", last_name="Lennon", birthday=date(1940, 10, 9)
    
  308.         )
    
  309.         p, created = Person.objects.update_or_create(
    
  310.             first_name="John",
    
  311.             last_name="Lennon",
    
  312.             defaults={"birthday": date(1940, 10, 10)},
    
  313.         )
    
  314.         self.assertFalse(created)
    
  315.         self.assertEqual(p.first_name, "John")
    
  316.         self.assertEqual(p.last_name, "Lennon")
    
  317.         self.assertEqual(p.birthday, date(1940, 10, 10))
    
  318. 
    
  319.     def test_create(self):
    
  320.         p, created = Person.objects.update_or_create(
    
  321.             first_name="John",
    
  322.             last_name="Lennon",
    
  323.             defaults={"birthday": date(1940, 10, 10)},
    
  324.         )
    
  325.         self.assertTrue(created)
    
  326.         self.assertEqual(p.first_name, "John")
    
  327.         self.assertEqual(p.last_name, "Lennon")
    
  328.         self.assertEqual(p.birthday, date(1940, 10, 10))
    
  329. 
    
  330.     def test_create_twice(self):
    
  331.         params = {
    
  332.             "first_name": "John",
    
  333.             "last_name": "Lennon",
    
  334.             "birthday": date(1940, 10, 10),
    
  335.         }
    
  336.         Person.objects.update_or_create(**params)
    
  337.         # If we execute the exact same statement, it won't create a Person.
    
  338.         p, created = Person.objects.update_or_create(**params)
    
  339.         self.assertFalse(created)
    
  340. 
    
  341.     def test_integrity(self):
    
  342.         """
    
  343.         If you don't specify a value or default value for all required
    
  344.         fields, you will get an error.
    
  345.         """
    
  346.         with self.assertRaises(IntegrityError):
    
  347.             Person.objects.update_or_create(first_name="Tom", last_name="Smith")
    
  348. 
    
  349.     def test_manual_primary_key_test(self):
    
  350.         """
    
  351.         If you specify an existing primary key, but different other fields,
    
  352.         then you will get an error and data will not be updated.
    
  353.         """
    
  354.         ManualPrimaryKeyTest.objects.create(id=1, data="Original")
    
  355.         with self.assertRaises(IntegrityError):
    
  356.             ManualPrimaryKeyTest.objects.update_or_create(id=1, data="Different")
    
  357.         self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, "Original")
    
  358. 
    
  359.     def test_with_pk_property(self):
    
  360.         """
    
  361.         Using the pk property of a model is allowed.
    
  362.         """
    
  363.         Thing.objects.update_or_create(pk=1)
    
  364. 
    
  365.     def test_update_or_create_with_model_property_defaults(self):
    
  366.         """Using a property with a setter implemented is allowed."""
    
  367.         t, _ = Thing.objects.get_or_create(
    
  368.             defaults={"capitalized_name_property": "annie"}, pk=1
    
  369.         )
    
  370.         self.assertEqual(t.name, "Annie")
    
  371. 
    
  372.     def test_error_contains_full_traceback(self):
    
  373.         """
    
  374.         update_or_create should raise IntegrityErrors with the full traceback.
    
  375.         This is tested by checking that a known method call is in the traceback.
    
  376.         We cannot use assertRaises/assertRaises here because we need to inspect
    
  377.         the actual traceback. Refs #16340.
    
  378.         """
    
  379.         try:
    
  380.             ManualPrimaryKeyTest.objects.update_or_create(id=1, data="Different")
    
  381.         except IntegrityError:
    
  382.             formatted_traceback = traceback.format_exc()
    
  383.             self.assertIn("obj.save", formatted_traceback)
    
  384. 
    
  385.     def test_create_with_related_manager(self):
    
  386.         """
    
  387.         Should be able to use update_or_create from the related manager to
    
  388.         create a book. Refs #23611.
    
  389.         """
    
  390.         p = Publisher.objects.create(name="Acme Publishing")
    
  391.         book, created = p.books.update_or_create(name="The Book of Ed & Fred")
    
  392.         self.assertTrue(created)
    
  393.         self.assertEqual(p.books.count(), 1)
    
  394. 
    
  395.     def test_update_with_related_manager(self):
    
  396.         """
    
  397.         Should be able to use update_or_create from the related manager to
    
  398.         update a book. Refs #23611.
    
  399.         """
    
  400.         p = Publisher.objects.create(name="Acme Publishing")
    
  401.         book = Book.objects.create(name="The Book of Ed & Fred", publisher=p)
    
  402.         self.assertEqual(p.books.count(), 1)
    
  403.         name = "The Book of Django"
    
  404.         book, created = p.books.update_or_create(defaults={"name": name}, id=book.id)
    
  405.         self.assertFalse(created)
    
  406.         self.assertEqual(book.name, name)
    
  407.         self.assertEqual(p.books.count(), 1)
    
  408. 
    
  409.     def test_create_with_many(self):
    
  410.         """
    
  411.         Should be able to use update_or_create from the m2m related manager to
    
  412.         create a book. Refs #23611.
    
  413.         """
    
  414.         p = Publisher.objects.create(name="Acme Publishing")
    
  415.         author = Author.objects.create(name="Ted")
    
  416.         book, created = author.books.update_or_create(
    
  417.             name="The Book of Ed & Fred", publisher=p
    
  418.         )
    
  419.         self.assertTrue(created)
    
  420.         self.assertEqual(author.books.count(), 1)
    
  421. 
    
  422.     def test_update_with_many(self):
    
  423.         """
    
  424.         Should be able to use update_or_create from the m2m related manager to
    
  425.         update a book. Refs #23611.
    
  426.         """
    
  427.         p = Publisher.objects.create(name="Acme Publishing")
    
  428.         author = Author.objects.create(name="Ted")
    
  429.         book = Book.objects.create(name="The Book of Ed & Fred", publisher=p)
    
  430.         book.authors.add(author)
    
  431.         self.assertEqual(author.books.count(), 1)
    
  432.         name = "The Book of Django"
    
  433.         book, created = author.books.update_or_create(
    
  434.             defaults={"name": name}, id=book.id
    
  435.         )
    
  436.         self.assertFalse(created)
    
  437.         self.assertEqual(book.name, name)
    
  438.         self.assertEqual(author.books.count(), 1)
    
  439. 
    
  440.     def test_defaults_exact(self):
    
  441.         """
    
  442.         If you have a field named defaults and want to use it as an exact
    
  443.         lookup, you need to use 'defaults__exact'.
    
  444.         """
    
  445.         obj, created = Person.objects.update_or_create(
    
  446.             first_name="George",
    
  447.             last_name="Harrison",
    
  448.             defaults__exact="testing",
    
  449.             defaults={
    
  450.                 "birthday": date(1943, 2, 25),
    
  451.                 "defaults": "testing",
    
  452.             },
    
  453.         )
    
  454.         self.assertTrue(created)
    
  455.         self.assertEqual(obj.defaults, "testing")
    
  456.         obj, created = Person.objects.update_or_create(
    
  457.             first_name="George",
    
  458.             last_name="Harrison",
    
  459.             defaults__exact="testing",
    
  460.             defaults={
    
  461.                 "birthday": date(1943, 2, 25),
    
  462.                 "defaults": "another testing",
    
  463.             },
    
  464.         )
    
  465.         self.assertFalse(created)
    
  466.         self.assertEqual(obj.defaults, "another testing")
    
  467. 
    
  468.     def test_create_callable_default(self):
    
  469.         obj, created = Person.objects.update_or_create(
    
  470.             first_name="George",
    
  471.             last_name="Harrison",
    
  472.             defaults={"birthday": lambda: date(1943, 2, 25)},
    
  473.         )
    
  474.         self.assertIs(created, True)
    
  475.         self.assertEqual(obj.birthday, date(1943, 2, 25))
    
  476. 
    
  477.     def test_update_callable_default(self):
    
  478.         Person.objects.update_or_create(
    
  479.             first_name="George",
    
  480.             last_name="Harrison",
    
  481.             birthday=date(1942, 2, 25),
    
  482.         )
    
  483.         obj, created = Person.objects.update_or_create(
    
  484.             first_name="George",
    
  485.             defaults={"last_name": lambda: "NotHarrison"},
    
  486.         )
    
  487.         self.assertIs(created, False)
    
  488.         self.assertEqual(obj.last_name, "NotHarrison")
    
  489. 
    
  490.     def test_defaults_not_evaluated_unless_needed(self):
    
  491.         """`defaults` aren't evaluated if the instance isn't created."""
    
  492.         Person.objects.create(
    
  493.             first_name="John", last_name="Lennon", birthday=date(1940, 10, 9)
    
  494.         )
    
  495. 
    
  496.         def raise_exception():
    
  497.             raise AssertionError
    
  498. 
    
  499.         obj, created = Person.objects.get_or_create(
    
  500.             first_name="John",
    
  501.             defaults=lazy(raise_exception, object)(),
    
  502.         )
    
  503.         self.assertFalse(created)
    
  504. 
    
  505. 
    
  506. class UpdateOrCreateTestsWithManualPKs(TestCase):
    
  507.     def test_create_with_duplicate_primary_key(self):
    
  508.         """
    
  509.         If an existing primary key is specified with different values for other
    
  510.         fields, then IntegrityError is raised and data isn't updated.
    
  511.         """
    
  512.         ManualPrimaryKeyTest.objects.create(id=1, data="Original")
    
  513.         with self.assertRaises(IntegrityError):
    
  514.             ManualPrimaryKeyTest.objects.update_or_create(id=1, data="Different")
    
  515.         self.assertEqual(ManualPrimaryKeyTest.objects.get(id=1).data, "Original")
    
  516. 
    
  517. 
    
  518. class UpdateOrCreateTransactionTests(TransactionTestCase):
    
  519.     available_apps = ["get_or_create"]
    
  520. 
    
  521.     @skipUnlessDBFeature("has_select_for_update")
    
  522.     @skipUnlessDBFeature("supports_transactions")
    
  523.     def test_updates_in_transaction(self):
    
  524.         """
    
  525.         Objects are selected and updated in a transaction to avoid race
    
  526.         conditions. This test forces update_or_create() to hold the lock
    
  527.         in another thread for a relatively long time so that it can update
    
  528.         while it holds the lock. The updated field isn't a field in 'defaults',
    
  529.         so update_or_create() shouldn't have an effect on it.
    
  530.         """
    
  531.         lock_status = {"has_grabbed_lock": False}
    
  532. 
    
  533.         def birthday_sleep():
    
  534.             lock_status["has_grabbed_lock"] = True
    
  535.             time.sleep(0.5)
    
  536.             return date(1940, 10, 10)
    
  537. 
    
  538.         def update_birthday_slowly():
    
  539.             Person.objects.update_or_create(
    
  540.                 first_name="John", defaults={"birthday": birthday_sleep}
    
  541.             )
    
  542.             # Avoid leaking connection for Oracle
    
  543.             connection.close()
    
  544. 
    
  545.         def lock_wait():
    
  546.             # timeout after ~0.5 seconds
    
  547.             for i in range(20):
    
  548.                 time.sleep(0.025)
    
  549.                 if lock_status["has_grabbed_lock"]:
    
  550.                     return True
    
  551.             return False
    
  552. 
    
  553.         Person.objects.create(
    
  554.             first_name="John", last_name="Lennon", birthday=date(1940, 10, 9)
    
  555.         )
    
  556. 
    
  557.         # update_or_create in a separate thread
    
  558.         t = Thread(target=update_birthday_slowly)
    
  559.         before_start = datetime.now()
    
  560.         t.start()
    
  561. 
    
  562.         if not lock_wait():
    
  563.             self.skipTest("Database took too long to lock the row")
    
  564. 
    
  565.         # Update during lock
    
  566.         Person.objects.filter(first_name="John").update(last_name="NotLennon")
    
  567.         after_update = datetime.now()
    
  568. 
    
  569.         # Wait for thread to finish
    
  570.         t.join()
    
  571. 
    
  572.         # The update remains and it blocked.
    
  573.         updated_person = Person.objects.get(first_name="John")
    
  574.         self.assertGreater(after_update - before_start, timedelta(seconds=0.5))
    
  575.         self.assertEqual(updated_person.last_name, "NotLennon")
    
  576. 
    
  577.     @skipUnlessDBFeature("has_select_for_update")
    
  578.     @skipUnlessDBFeature("supports_transactions")
    
  579.     def test_creation_in_transaction(self):
    
  580.         """
    
  581.         Objects are selected and updated in a transaction to avoid race
    
  582.         conditions. This test checks the behavior of update_or_create() when
    
  583.         the object doesn't already exist, but another thread creates the
    
  584.         object before update_or_create() does and then attempts to update the
    
  585.         object, also before update_or_create(). It forces update_or_create() to
    
  586.         hold the lock in another thread for a relatively long time so that it
    
  587.         can update while it holds the lock. The updated field isn't a field in
    
  588.         'defaults', so update_or_create() shouldn't have an effect on it.
    
  589.         """
    
  590.         lock_status = {"lock_count": 0}
    
  591. 
    
  592.         def birthday_sleep():
    
  593.             lock_status["lock_count"] += 1
    
  594.             time.sleep(0.5)
    
  595.             return date(1940, 10, 10)
    
  596. 
    
  597.         def update_birthday_slowly():
    
  598.             try:
    
  599.                 Person.objects.update_or_create(
    
  600.                     first_name="John", defaults={"birthday": birthday_sleep}
    
  601.                 )
    
  602.             finally:
    
  603.                 # Avoid leaking connection for Oracle
    
  604.                 connection.close()
    
  605. 
    
  606.         def lock_wait(expected_lock_count):
    
  607.             # timeout after ~0.5 seconds
    
  608.             for i in range(20):
    
  609.                 time.sleep(0.025)
    
  610.                 if lock_status["lock_count"] == expected_lock_count:
    
  611.                     return True
    
  612.             self.skipTest("Database took too long to lock the row")
    
  613. 
    
  614.         # update_or_create in a separate thread.
    
  615.         t = Thread(target=update_birthday_slowly)
    
  616.         before_start = datetime.now()
    
  617.         t.start()
    
  618.         lock_wait(1)
    
  619.         # Create object *after* initial attempt by update_or_create to get obj
    
  620.         # but before creation attempt.
    
  621.         Person.objects.create(
    
  622.             first_name="John", last_name="Lennon", birthday=date(1940, 10, 9)
    
  623.         )
    
  624.         lock_wait(2)
    
  625.         # At this point, the thread is pausing for 0.5 seconds, so now attempt
    
  626.         # to modify object before update_or_create() calls save(). This should
    
  627.         # be blocked until after the save().
    
  628.         Person.objects.filter(first_name="John").update(last_name="NotLennon")
    
  629.         after_update = datetime.now()
    
  630.         # Wait for thread to finish
    
  631.         t.join()
    
  632.         # Check call to update_or_create() succeeded and the subsequent
    
  633.         # (blocked) call to update().
    
  634.         updated_person = Person.objects.get(first_name="John")
    
  635.         self.assertEqual(
    
  636.             updated_person.birthday, date(1940, 10, 10)
    
  637.         )  # set by update_or_create()
    
  638.         self.assertEqual(updated_person.last_name, "NotLennon")  # set by update()
    
  639.         self.assertGreater(after_update - before_start, timedelta(seconds=1))
    
  640. 
    
  641. 
    
  642. class InvalidCreateArgumentsTests(TransactionTestCase):
    
  643.     available_apps = ["get_or_create"]
    
  644.     msg = "Invalid field name(s) for model Thing: 'nonexistent'."
    
  645.     bad_field_msg = (
    
  646.         "Cannot resolve keyword 'nonexistent' into field. Choices are: id, name, tags"
    
  647.     )
    
  648. 
    
  649.     def test_get_or_create_with_invalid_defaults(self):
    
  650.         with self.assertRaisesMessage(FieldError, self.msg):
    
  651.             Thing.objects.get_or_create(name="a", defaults={"nonexistent": "b"})
    
  652. 
    
  653.     def test_get_or_create_with_invalid_kwargs(self):
    
  654.         with self.assertRaisesMessage(FieldError, self.bad_field_msg):
    
  655.             Thing.objects.get_or_create(name="a", nonexistent="b")
    
  656. 
    
  657.     def test_update_or_create_with_invalid_defaults(self):
    
  658.         with self.assertRaisesMessage(FieldError, self.msg):
    
  659.             Thing.objects.update_or_create(name="a", defaults={"nonexistent": "b"})
    
  660. 
    
  661.     def test_update_or_create_with_invalid_kwargs(self):
    
  662.         with self.assertRaisesMessage(FieldError, self.bad_field_msg):
    
  663.             Thing.objects.update_or_create(name="a", nonexistent="b")
    
  664. 
    
  665.     def test_multiple_invalid_fields(self):
    
  666.         with self.assertRaisesMessage(FieldError, self.bad_field_msg):
    
  667.             Thing.objects.update_or_create(
    
  668.                 name="a", nonexistent="b", defaults={"invalid": "c"}
    
  669.             )
    
  670. 
    
  671.     def test_property_attribute_without_setter_defaults(self):
    
  672.         with self.assertRaisesMessage(
    
  673.             FieldError, "Invalid field name(s) for model Thing: 'name_in_all_caps'"
    
  674.         ):
    
  675.             Thing.objects.update_or_create(
    
  676.                 name="a", defaults={"name_in_all_caps": "FRANK"}
    
  677.             )
    
  678. 
    
  679.     def test_property_attribute_without_setter_kwargs(self):
    
  680.         msg = (
    
  681.             "Cannot resolve keyword 'name_in_all_caps' into field. Choices are: id, "
    
  682.             "name, tags"
    
  683.         )
    
  684.         with self.assertRaisesMessage(FieldError, msg):
    
  685.             Thing.objects.update_or_create(
    
  686.                 name_in_all_caps="FRANK", defaults={"name": "Frank"}
    
  687.             )