1. import threading
    
  2. import time
    
  3. from unittest import mock
    
  4. 
    
  5. from multiple_database.routers import TestRouter
    
  6. 
    
  7. from django.core.exceptions import FieldError
    
  8. from django.db import (
    
  9.     DatabaseError,
    
  10.     NotSupportedError,
    
  11.     connection,
    
  12.     connections,
    
  13.     router,
    
  14.     transaction,
    
  15. )
    
  16. from django.test import (
    
  17.     TransactionTestCase,
    
  18.     override_settings,
    
  19.     skipIfDBFeature,
    
  20.     skipUnlessDBFeature,
    
  21. )
    
  22. from django.test.utils import CaptureQueriesContext
    
  23. 
    
  24. from .models import (
    
  25.     City,
    
  26.     CityCountryProxy,
    
  27.     Country,
    
  28.     EUCity,
    
  29.     EUCountry,
    
  30.     Person,
    
  31.     PersonProfile,
    
  32. )
    
  33. 
    
  34. 
    
  35. class SelectForUpdateTests(TransactionTestCase):
    
  36.     available_apps = ["select_for_update"]
    
  37. 
    
  38.     def setUp(self):
    
  39.         # This is executed in autocommit mode so that code in
    
  40.         # run_select_for_update can see this data.
    
  41.         self.country1 = Country.objects.create(name="Belgium")
    
  42.         self.country2 = Country.objects.create(name="France")
    
  43.         self.city1 = City.objects.create(name="Liberchies", country=self.country1)
    
  44.         self.city2 = City.objects.create(name="Samois-sur-Seine", country=self.country2)
    
  45.         self.person = Person.objects.create(
    
  46.             name="Reinhardt", born=self.city1, died=self.city2
    
  47.         )
    
  48.         self.person_profile = PersonProfile.objects.create(person=self.person)
    
  49. 
    
  50.         # We need another database connection in transaction to test that one
    
  51.         # connection issuing a SELECT ... FOR UPDATE will block.
    
  52.         self.new_connection = connection.copy()
    
  53. 
    
  54.     def tearDown(self):
    
  55.         try:
    
  56.             self.end_blocking_transaction()
    
  57.         except (DatabaseError, AttributeError):
    
  58.             pass
    
  59.         self.new_connection.close()
    
  60. 
    
  61.     def start_blocking_transaction(self):
    
  62.         self.new_connection.set_autocommit(False)
    
  63.         # Start a blocking transaction. At some point,
    
  64.         # end_blocking_transaction() should be called.
    
  65.         self.cursor = self.new_connection.cursor()
    
  66.         sql = "SELECT * FROM %(db_table)s %(for_update)s;" % {
    
  67.             "db_table": Person._meta.db_table,
    
  68.             "for_update": self.new_connection.ops.for_update_sql(),
    
  69.         }
    
  70.         self.cursor.execute(sql, ())
    
  71.         self.cursor.fetchone()
    
  72. 
    
  73.     def end_blocking_transaction(self):
    
  74.         # Roll back the blocking transaction.
    
  75.         self.cursor.close()
    
  76.         self.new_connection.rollback()
    
  77.         self.new_connection.set_autocommit(True)
    
  78. 
    
  79.     def has_for_update_sql(self, queries, **kwargs):
    
  80.         # Examine the SQL that was executed to determine whether it
    
  81.         # contains the 'SELECT..FOR UPDATE' stanza.
    
  82.         for_update_sql = connection.ops.for_update_sql(**kwargs)
    
  83.         return any(for_update_sql in query["sql"] for query in queries)
    
  84. 
    
  85.     @skipUnlessDBFeature("has_select_for_update")
    
  86.     def test_for_update_sql_generated(self):
    
  87.         """
    
  88.         The backend's FOR UPDATE variant appears in
    
  89.         generated SQL when select_for_update is invoked.
    
  90.         """
    
  91.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  92.             list(Person.objects.select_for_update())
    
  93.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries))
    
  94. 
    
  95.     @skipUnlessDBFeature("has_select_for_update_nowait")
    
  96.     def test_for_update_sql_generated_nowait(self):
    
  97.         """
    
  98.         The backend's FOR UPDATE NOWAIT variant appears in
    
  99.         generated SQL when select_for_update is invoked.
    
  100.         """
    
  101.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  102.             list(Person.objects.select_for_update(nowait=True))
    
  103.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
    
  104. 
    
  105.     @skipUnlessDBFeature("has_select_for_update_skip_locked")
    
  106.     def test_for_update_sql_generated_skip_locked(self):
    
  107.         """
    
  108.         The backend's FOR UPDATE SKIP LOCKED variant appears in
    
  109.         generated SQL when select_for_update is invoked.
    
  110.         """
    
  111.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  112.             list(Person.objects.select_for_update(skip_locked=True))
    
  113.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True))
    
  114. 
    
  115.     @skipUnlessDBFeature("has_select_for_no_key_update")
    
  116.     def test_update_sql_generated_no_key(self):
    
  117.         """
    
  118.         The backend's FOR NO KEY UPDATE variant appears in generated SQL when
    
  119.         select_for_update() is invoked.
    
  120.         """
    
  121.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  122.             list(Person.objects.select_for_update(no_key=True))
    
  123.         self.assertIs(self.has_for_update_sql(ctx.captured_queries, no_key=True), True)
    
  124. 
    
  125.     @skipUnlessDBFeature("has_select_for_update_of")
    
  126.     def test_for_update_sql_generated_of(self):
    
  127.         """
    
  128.         The backend's FOR UPDATE OF variant appears in the generated SQL when
    
  129.         select_for_update() is invoked.
    
  130.         """
    
  131.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  132.             list(
    
  133.                 Person.objects.select_related(
    
  134.                     "born__country",
    
  135.                 )
    
  136.                 .select_for_update(
    
  137.                     of=("born__country",),
    
  138.                 )
    
  139.                 .select_for_update(of=("self", "born__country"))
    
  140.             )
    
  141.         features = connections["default"].features
    
  142.         if features.select_for_update_of_column:
    
  143.             expected = [
    
  144.                 'select_for_update_person"."id',
    
  145.                 'select_for_update_country"."entity_ptr_id',
    
  146.             ]
    
  147.         else:
    
  148.             expected = ["select_for_update_person", "select_for_update_country"]
    
  149.         expected = [connection.ops.quote_name(value) for value in expected]
    
  150.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
    
  151. 
    
  152.     @skipUnlessDBFeature("has_select_for_update_of")
    
  153.     def test_for_update_sql_model_inheritance_generated_of(self):
    
  154.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  155.             list(EUCountry.objects.select_for_update(of=("self",)))
    
  156.         if connection.features.select_for_update_of_column:
    
  157.             expected = ['select_for_update_eucountry"."country_ptr_id']
    
  158.         else:
    
  159.             expected = ["select_for_update_eucountry"]
    
  160.         expected = [connection.ops.quote_name(value) for value in expected]
    
  161.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
    
  162. 
    
  163.     @skipUnlessDBFeature("has_select_for_update_of")
    
  164.     def test_for_update_sql_model_inheritance_ptr_generated_of(self):
    
  165.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  166.             list(
    
  167.                 EUCountry.objects.select_for_update(
    
  168.                     of=(
    
  169.                         "self",
    
  170.                         "country_ptr",
    
  171.                     )
    
  172.                 )
    
  173.             )
    
  174.         if connection.features.select_for_update_of_column:
    
  175.             expected = [
    
  176.                 'select_for_update_eucountry"."country_ptr_id',
    
  177.                 'select_for_update_country"."entity_ptr_id',
    
  178.             ]
    
  179.         else:
    
  180.             expected = ["select_for_update_eucountry", "select_for_update_country"]
    
  181.         expected = [connection.ops.quote_name(value) for value in expected]
    
  182.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
    
  183. 
    
  184.     @skipUnlessDBFeature("has_select_for_update_of")
    
  185.     def test_for_update_sql_related_model_inheritance_generated_of(self):
    
  186.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  187.             list(
    
  188.                 EUCity.objects.select_related("country").select_for_update(
    
  189.                     of=("self", "country"),
    
  190.                 )
    
  191.             )
    
  192.         if connection.features.select_for_update_of_column:
    
  193.             expected = [
    
  194.                 'select_for_update_eucity"."id',
    
  195.                 'select_for_update_eucountry"."country_ptr_id',
    
  196.             ]
    
  197.         else:
    
  198.             expected = ["select_for_update_eucity", "select_for_update_eucountry"]
    
  199.         expected = [connection.ops.quote_name(value) for value in expected]
    
  200.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
    
  201. 
    
  202.     @skipUnlessDBFeature("has_select_for_update_of")
    
  203.     def test_for_update_sql_model_inheritance_nested_ptr_generated_of(self):
    
  204.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  205.             list(
    
  206.                 EUCity.objects.select_related("country").select_for_update(
    
  207.                     of=(
    
  208.                         "self",
    
  209.                         "country__country_ptr",
    
  210.                     ),
    
  211.                 )
    
  212.             )
    
  213.         if connection.features.select_for_update_of_column:
    
  214.             expected = [
    
  215.                 'select_for_update_eucity"."id',
    
  216.                 'select_for_update_country"."entity_ptr_id',
    
  217.             ]
    
  218.         else:
    
  219.             expected = ["select_for_update_eucity", "select_for_update_country"]
    
  220.         expected = [connection.ops.quote_name(value) for value in expected]
    
  221.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
    
  222. 
    
  223.     @skipUnlessDBFeature("has_select_for_update_of")
    
  224.     def test_for_update_sql_multilevel_model_inheritance_ptr_generated_of(self):
    
  225.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  226.             list(
    
  227.                 EUCountry.objects.select_for_update(
    
  228.                     of=("country_ptr", "country_ptr__entity_ptr"),
    
  229.                 )
    
  230.             )
    
  231.         if connection.features.select_for_update_of_column:
    
  232.             expected = [
    
  233.                 'select_for_update_country"."entity_ptr_id',
    
  234.                 'select_for_update_entity"."id',
    
  235.             ]
    
  236.         else:
    
  237.             expected = ["select_for_update_country", "select_for_update_entity"]
    
  238.         expected = [connection.ops.quote_name(value) for value in expected]
    
  239.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
    
  240. 
    
  241.     @skipUnlessDBFeature("has_select_for_update_of")
    
  242.     def test_for_update_sql_model_proxy_generated_of(self):
    
  243.         with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
    
  244.             list(
    
  245.                 CityCountryProxy.objects.select_related(
    
  246.                     "country",
    
  247.                 ).select_for_update(
    
  248.                     of=("country",),
    
  249.                 )
    
  250.             )
    
  251.         if connection.features.select_for_update_of_column:
    
  252.             expected = ['select_for_update_country"."entity_ptr_id']
    
  253.         else:
    
  254.             expected = ["select_for_update_country"]
    
  255.         expected = [connection.ops.quote_name(value) for value in expected]
    
  256.         self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
    
  257. 
    
  258.     @skipUnlessDBFeature("has_select_for_update_of")
    
  259.     def test_for_update_of_followed_by_values(self):
    
  260.         with transaction.atomic():
    
  261.             values = list(Person.objects.select_for_update(of=("self",)).values("pk"))
    
  262.         self.assertEqual(values, [{"pk": self.person.pk}])
    
  263. 
    
  264.     @skipUnlessDBFeature("has_select_for_update_of")
    
  265.     def test_for_update_of_followed_by_values_list(self):
    
  266.         with transaction.atomic():
    
  267.             values = list(
    
  268.                 Person.objects.select_for_update(of=("self",)).values_list("pk")
    
  269.             )
    
  270.         self.assertEqual(values, [(self.person.pk,)])
    
  271. 
    
  272.     @skipUnlessDBFeature("has_select_for_update_of")
    
  273.     def test_for_update_of_self_when_self_is_not_selected(self):
    
  274.         """
    
  275.         select_for_update(of=['self']) when the only columns selected are from
    
  276.         related tables.
    
  277.         """
    
  278.         with transaction.atomic():
    
  279.             values = list(
    
  280.                 Person.objects.select_related("born")
    
  281.                 .select_for_update(of=("self",))
    
  282.                 .values("born__name")
    
  283.             )
    
  284.         self.assertEqual(values, [{"born__name": self.city1.name}])
    
  285. 
    
  286.     @skipUnlessDBFeature(
    
  287.         "has_select_for_update_of",
    
  288.         "supports_select_for_update_with_limit",
    
  289.     )
    
  290.     def test_for_update_of_with_exists(self):
    
  291.         with transaction.atomic():
    
  292.             qs = Person.objects.select_for_update(of=("self", "born"))
    
  293.             self.assertIs(qs.exists(), True)
    
  294. 
    
  295.     @skipUnlessDBFeature("has_select_for_update_nowait", "supports_transactions")
    
  296.     def test_nowait_raises_error_on_block(self):
    
  297.         """
    
  298.         If nowait is specified, we expect an error to be raised rather
    
  299.         than blocking.
    
  300.         """
    
  301.         self.start_blocking_transaction()
    
  302.         status = []
    
  303. 
    
  304.         thread = threading.Thread(
    
  305.             target=self.run_select_for_update,
    
  306.             args=(status,),
    
  307.             kwargs={"nowait": True},
    
  308.         )
    
  309. 
    
  310.         thread.start()
    
  311.         time.sleep(1)
    
  312.         thread.join()
    
  313.         self.end_blocking_transaction()
    
  314.         self.assertIsInstance(status[-1], DatabaseError)
    
  315. 
    
  316.     @skipUnlessDBFeature("has_select_for_update_skip_locked", "supports_transactions")
    
  317.     def test_skip_locked_skips_locked_rows(self):
    
  318.         """
    
  319.         If skip_locked is specified, the locked row is skipped resulting in
    
  320.         Person.DoesNotExist.
    
  321.         """
    
  322.         self.start_blocking_transaction()
    
  323.         status = []
    
  324.         thread = threading.Thread(
    
  325.             target=self.run_select_for_update,
    
  326.             args=(status,),
    
  327.             kwargs={"skip_locked": True},
    
  328.         )
    
  329.         thread.start()
    
  330.         time.sleep(1)
    
  331.         thread.join()
    
  332.         self.end_blocking_transaction()
    
  333.         self.assertIsInstance(status[-1], Person.DoesNotExist)
    
  334. 
    
  335.     @skipIfDBFeature("has_select_for_update_nowait")
    
  336.     @skipUnlessDBFeature("has_select_for_update")
    
  337.     def test_unsupported_nowait_raises_error(self):
    
  338.         """
    
  339.         NotSupportedError is raised if a SELECT...FOR UPDATE NOWAIT is run on
    
  340.         a database backend that supports FOR UPDATE but not NOWAIT.
    
  341.         """
    
  342.         with self.assertRaisesMessage(
    
  343.             NotSupportedError, "NOWAIT is not supported on this database backend."
    
  344.         ):
    
  345.             with transaction.atomic():
    
  346.                 Person.objects.select_for_update(nowait=True).get()
    
  347. 
    
  348.     @skipIfDBFeature("has_select_for_update_skip_locked")
    
  349.     @skipUnlessDBFeature("has_select_for_update")
    
  350.     def test_unsupported_skip_locked_raises_error(self):
    
  351.         """
    
  352.         NotSupportedError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run
    
  353.         on a database backend that supports FOR UPDATE but not SKIP LOCKED.
    
  354.         """
    
  355.         with self.assertRaisesMessage(
    
  356.             NotSupportedError, "SKIP LOCKED is not supported on this database backend."
    
  357.         ):
    
  358.             with transaction.atomic():
    
  359.                 Person.objects.select_for_update(skip_locked=True).get()
    
  360. 
    
  361.     @skipIfDBFeature("has_select_for_update_of")
    
  362.     @skipUnlessDBFeature("has_select_for_update")
    
  363.     def test_unsupported_of_raises_error(self):
    
  364.         """
    
  365.         NotSupportedError is raised if a SELECT...FOR UPDATE OF... is run on
    
  366.         a database backend that supports FOR UPDATE but not OF.
    
  367.         """
    
  368.         msg = "FOR UPDATE OF is not supported on this database backend."
    
  369.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  370.             with transaction.atomic():
    
  371.                 Person.objects.select_for_update(of=("self",)).get()
    
  372. 
    
  373.     @skipIfDBFeature("has_select_for_no_key_update")
    
  374.     @skipUnlessDBFeature("has_select_for_update")
    
  375.     def test_unsuported_no_key_raises_error(self):
    
  376.         """
    
  377.         NotSupportedError is raised if a SELECT...FOR NO KEY UPDATE... is run
    
  378.         on a database backend that supports FOR UPDATE but not NO KEY.
    
  379.         """
    
  380.         msg = "FOR NO KEY UPDATE is not supported on this database backend."
    
  381.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  382.             with transaction.atomic():
    
  383.                 Person.objects.select_for_update(no_key=True).get()
    
  384. 
    
  385.     @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
    
  386.     def test_unrelated_of_argument_raises_error(self):
    
  387.         """
    
  388.         FieldError is raised if a non-relation field is specified in of=(...).
    
  389.         """
    
  390.         msg = (
    
  391.             "Invalid field name(s) given in select_for_update(of=(...)): %s. "
    
  392.             "Only relational fields followed in the query are allowed. "
    
  393.             "Choices are: self, born, born__country, "
    
  394.             "born__country__entity_ptr."
    
  395.         )
    
  396.         invalid_of = [
    
  397.             ("nonexistent",),
    
  398.             ("name",),
    
  399.             ("born__nonexistent",),
    
  400.             ("born__name",),
    
  401.             ("born__nonexistent", "born__name"),
    
  402.         ]
    
  403.         for of in invalid_of:
    
  404.             with self.subTest(of=of):
    
  405.                 with self.assertRaisesMessage(FieldError, msg % ", ".join(of)):
    
  406.                     with transaction.atomic():
    
  407.                         Person.objects.select_related(
    
  408.                             "born__country"
    
  409.                         ).select_for_update(of=of).get()
    
  410. 
    
  411.     @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
    
  412.     def test_related_but_unselected_of_argument_raises_error(self):
    
  413.         """
    
  414.         FieldError is raised if a relation field that is not followed in the
    
  415.         query is specified in of=(...).
    
  416.         """
    
  417.         msg = (
    
  418.             "Invalid field name(s) given in select_for_update(of=(...)): %s. "
    
  419.             "Only relational fields followed in the query are allowed. "
    
  420.             "Choices are: self, born, profile."
    
  421.         )
    
  422.         for name in ["born__country", "died", "died__country"]:
    
  423.             with self.subTest(name=name):
    
  424.                 with self.assertRaisesMessage(FieldError, msg % name):
    
  425.                     with transaction.atomic():
    
  426.                         Person.objects.select_related(
    
  427.                             "born",
    
  428.                             "profile",
    
  429.                         ).exclude(
    
  430.                             profile=None
    
  431.                         ).select_for_update(of=(name,)).get()
    
  432. 
    
  433.     @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
    
  434.     def test_model_inheritance_of_argument_raises_error_ptr_in_choices(self):
    
  435.         msg = (
    
  436.             "Invalid field name(s) given in select_for_update(of=(...)): "
    
  437.             "name. Only relational fields followed in the query are allowed. "
    
  438.             "Choices are: self, %s."
    
  439.         )
    
  440.         with self.assertRaisesMessage(
    
  441.             FieldError,
    
  442.             msg % "country, country__country_ptr, country__country_ptr__entity_ptr",
    
  443.         ):
    
  444.             with transaction.atomic():
    
  445.                 EUCity.objects.select_related(
    
  446.                     "country",
    
  447.                 ).select_for_update(of=("name",)).get()
    
  448.         with self.assertRaisesMessage(
    
  449.             FieldError, msg % "country_ptr, country_ptr__entity_ptr"
    
  450.         ):
    
  451.             with transaction.atomic():
    
  452.                 EUCountry.objects.select_for_update(of=("name",)).get()
    
  453. 
    
  454.     @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
    
  455.     def test_model_proxy_of_argument_raises_error_proxy_field_in_choices(self):
    
  456.         msg = (
    
  457.             "Invalid field name(s) given in select_for_update(of=(...)): "
    
  458.             "name. Only relational fields followed in the query are allowed. "
    
  459.             "Choices are: self, country, country__entity_ptr."
    
  460.         )
    
  461.         with self.assertRaisesMessage(FieldError, msg):
    
  462.             with transaction.atomic():
    
  463.                 CityCountryProxy.objects.select_related(
    
  464.                     "country",
    
  465.                 ).select_for_update(of=("name",)).get()
    
  466. 
    
  467.     @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
    
  468.     def test_reverse_one_to_one_of_arguments(self):
    
  469.         """
    
  470.         Reverse OneToOneFields may be included in of=(...) as long as NULLs
    
  471.         are excluded because LEFT JOIN isn't allowed in SELECT FOR UPDATE.
    
  472.         """
    
  473.         with transaction.atomic():
    
  474.             person = (
    
  475.                 Person.objects.select_related(
    
  476.                     "profile",
    
  477.                 )
    
  478.                 .exclude(profile=None)
    
  479.                 .select_for_update(of=("profile",))
    
  480.                 .get()
    
  481.             )
    
  482.             self.assertEqual(person.profile, self.person_profile)
    
  483. 
    
  484.     @skipUnlessDBFeature("has_select_for_update")
    
  485.     def test_for_update_after_from(self):
    
  486.         features_class = connections["default"].features.__class__
    
  487.         attribute_to_patch = "%s.%s.for_update_after_from" % (
    
  488.             features_class.__module__,
    
  489.             features_class.__name__,
    
  490.         )
    
  491.         with mock.patch(attribute_to_patch, return_value=True):
    
  492.             with transaction.atomic():
    
  493.                 self.assertIn(
    
  494.                     "FOR UPDATE WHERE",
    
  495.                     str(Person.objects.filter(name="foo").select_for_update().query),
    
  496.                 )
    
  497. 
    
  498.     @skipUnlessDBFeature("has_select_for_update", "supports_transactions")
    
  499.     def test_for_update_requires_transaction(self):
    
  500.         """
    
  501.         A TransactionManagementError is raised
    
  502.         when a select_for_update query is executed outside of a transaction.
    
  503.         """
    
  504.         msg = "select_for_update cannot be used outside of a transaction."
    
  505.         with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
    
  506.             list(Person.objects.select_for_update())
    
  507. 
    
  508.     @skipUnlessDBFeature("has_select_for_update", "supports_transactions")
    
  509.     def test_for_update_requires_transaction_only_in_execution(self):
    
  510.         """
    
  511.         No TransactionManagementError is raised
    
  512.         when select_for_update is invoked outside of a transaction -
    
  513.         only when the query is executed.
    
  514.         """
    
  515.         people = Person.objects.select_for_update()
    
  516.         msg = "select_for_update cannot be used outside of a transaction."
    
  517.         with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
    
  518.             list(people)
    
  519. 
    
  520.     @skipUnlessDBFeature("supports_select_for_update_with_limit")
    
  521.     def test_select_for_update_with_limit(self):
    
  522.         other = Person.objects.create(name="Grappeli", born=self.city1, died=self.city2)
    
  523.         with transaction.atomic():
    
  524.             qs = list(Person.objects.order_by("pk").select_for_update()[1:2])
    
  525.             self.assertEqual(qs[0], other)
    
  526. 
    
  527.     @skipIfDBFeature("supports_select_for_update_with_limit")
    
  528.     def test_unsupported_select_for_update_with_limit(self):
    
  529.         msg = (
    
  530.             "LIMIT/OFFSET is not supported with select_for_update on this database "
    
  531.             "backend."
    
  532.         )
    
  533.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  534.             with transaction.atomic():
    
  535.                 list(Person.objects.order_by("pk").select_for_update()[1:2])
    
  536. 
    
  537.     def run_select_for_update(self, status, **kwargs):
    
  538.         """
    
  539.         Utility method that runs a SELECT FOR UPDATE against all
    
  540.         Person instances. After the select_for_update, it attempts
    
  541.         to update the name of the only record, save, and commit.
    
  542. 
    
  543.         This function expects to run in a separate thread.
    
  544.         """
    
  545.         status.append("started")
    
  546.         try:
    
  547.             # We need to enter transaction management again, as this is done on
    
  548.             # per-thread basis
    
  549.             with transaction.atomic():
    
  550.                 person = Person.objects.select_for_update(**kwargs).get()
    
  551.                 person.name = "Fred"
    
  552.                 person.save()
    
  553.         except (DatabaseError, Person.DoesNotExist) as e:
    
  554.             status.append(e)
    
  555.         finally:
    
  556.             # This method is run in a separate thread. It uses its own
    
  557.             # database connection. Close it without waiting for the GC.
    
  558.             connection.close()
    
  559. 
    
  560.     @skipUnlessDBFeature("has_select_for_update")
    
  561.     @skipUnlessDBFeature("supports_transactions")
    
  562.     def test_block(self):
    
  563.         """
    
  564.         A thread running a select_for_update that accesses rows being touched
    
  565.         by a similar operation on another connection blocks correctly.
    
  566.         """
    
  567.         # First, let's start the transaction in our thread.
    
  568.         self.start_blocking_transaction()
    
  569. 
    
  570.         # Now, try it again using the ORM's select_for_update
    
  571.         # facility. Do this in a separate thread.
    
  572.         status = []
    
  573.         thread = threading.Thread(target=self.run_select_for_update, args=(status,))
    
  574. 
    
  575.         # The thread should immediately block, but we'll sleep
    
  576.         # for a bit to make sure.
    
  577.         thread.start()
    
  578.         sanity_count = 0
    
  579.         while len(status) != 1 and sanity_count < 10:
    
  580.             sanity_count += 1
    
  581.             time.sleep(1)
    
  582.         if sanity_count >= 10:
    
  583.             raise ValueError("Thread did not run and block")
    
  584. 
    
  585.         # Check the person hasn't been updated. Since this isn't
    
  586.         # using FOR UPDATE, it won't block.
    
  587.         p = Person.objects.get(pk=self.person.pk)
    
  588.         self.assertEqual("Reinhardt", p.name)
    
  589. 
    
  590.         # When we end our blocking transaction, our thread should
    
  591.         # be able to continue.
    
  592.         self.end_blocking_transaction()
    
  593.         thread.join(5.0)
    
  594. 
    
  595.         # Check the thread has finished. Assuming it has, we should
    
  596.         # find that it has updated the person's name.
    
  597.         self.assertFalse(thread.is_alive())
    
  598. 
    
  599.         # We must commit the transaction to ensure that MySQL gets a fresh read,
    
  600.         # since by default it runs in REPEATABLE READ mode
    
  601.         transaction.commit()
    
  602. 
    
  603.         p = Person.objects.get(pk=self.person.pk)
    
  604.         self.assertEqual("Fred", p.name)
    
  605. 
    
  606.     @skipUnlessDBFeature("has_select_for_update", "supports_transactions")
    
  607.     def test_raw_lock_not_available(self):
    
  608.         """
    
  609.         Running a raw query which can't obtain a FOR UPDATE lock raises
    
  610.         the correct exception
    
  611.         """
    
  612.         self.start_blocking_transaction()
    
  613. 
    
  614.         def raw(status):
    
  615.             try:
    
  616.                 list(
    
  617.                     Person.objects.raw(
    
  618.                         "SELECT * FROM %s %s"
    
  619.                         % (
    
  620.                             Person._meta.db_table,
    
  621.                             connection.ops.for_update_sql(nowait=True),
    
  622.                         )
    
  623.                     )
    
  624.                 )
    
  625.             except DatabaseError as e:
    
  626.                 status.append(e)
    
  627.             finally:
    
  628.                 # This method is run in a separate thread. It uses its own
    
  629.                 # database connection. Close it without waiting for the GC.
    
  630.                 # Connection cannot be closed on Oracle because cursor is still
    
  631.                 # open.
    
  632.                 if connection.vendor != "oracle":
    
  633.                     connection.close()
    
  634. 
    
  635.         status = []
    
  636.         thread = threading.Thread(target=raw, kwargs={"status": status})
    
  637.         thread.start()
    
  638.         time.sleep(1)
    
  639.         thread.join()
    
  640.         self.end_blocking_transaction()
    
  641.         self.assertIsInstance(status[-1], DatabaseError)
    
  642. 
    
  643.     @skipUnlessDBFeature("has_select_for_update")
    
  644.     @override_settings(DATABASE_ROUTERS=[TestRouter()])
    
  645.     def test_select_for_update_on_multidb(self):
    
  646.         query = Person.objects.select_for_update()
    
  647.         self.assertEqual(router.db_for_write(Person), query.db)
    
  648. 
    
  649.     @skipUnlessDBFeature("has_select_for_update")
    
  650.     def test_select_for_update_with_get(self):
    
  651.         with transaction.atomic():
    
  652.             person = Person.objects.select_for_update().get(name="Reinhardt")
    
  653.         self.assertEqual(person.name, "Reinhardt")
    
  654. 
    
  655.     def test_nowait_and_skip_locked(self):
    
  656.         with self.assertRaisesMessage(
    
  657.             ValueError, "The nowait option cannot be used with skip_locked."
    
  658.         ):
    
  659.             Person.objects.select_for_update(nowait=True, skip_locked=True)
    
  660. 
    
  661.     def test_ordered_select_for_update(self):
    
  662.         """
    
  663.         Subqueries should respect ordering as an ORDER BY clause may be useful
    
  664.         to specify a row locking order to prevent deadlocks (#27193).
    
  665.         """
    
  666.         with transaction.atomic():
    
  667.             qs = Person.objects.filter(
    
  668.                 id__in=Person.objects.order_by("-id").select_for_update()
    
  669.             )
    
  670.             self.assertIn("ORDER BY", str(qs.query))