1. from datetime import date
    
  2. from decimal import Decimal
    
  3. from unittest import mock
    
  4. 
    
  5. from django.db import connection, transaction
    
  6. from django.db.models import (
    
  7.     Case,
    
  8.     Count,
    
  9.     DecimalField,
    
  10.     F,
    
  11.     FilteredRelation,
    
  12.     Q,
    
  13.     Sum,
    
  14.     When,
    
  15. )
    
  16. from django.test import TestCase
    
  17. from django.test.testcases import skipUnlessDBFeature
    
  18. 
    
  19. from .models import (
    
  20.     Author,
    
  21.     Book,
    
  22.     BookDailySales,
    
  23.     Borrower,
    
  24.     Currency,
    
  25.     Editor,
    
  26.     ExchangeRate,
    
  27.     RentalSession,
    
  28.     Reservation,
    
  29.     Seller,
    
  30. )
    
  31. 
    
  32. 
    
  33. class FilteredRelationTests(TestCase):
    
  34.     @classmethod
    
  35.     def setUpTestData(cls):
    
  36.         cls.author1 = Author.objects.create(name="Alice")
    
  37.         cls.author2 = Author.objects.create(name="Jane")
    
  38.         cls.editor_a = Editor.objects.create(name="a")
    
  39.         cls.editor_b = Editor.objects.create(name="b")
    
  40.         cls.book1 = Book.objects.create(
    
  41.             title="Poem by Alice",
    
  42.             editor=cls.editor_a,
    
  43.             author=cls.author1,
    
  44.         )
    
  45.         cls.book1.generic_author.set([cls.author2])
    
  46.         cls.book2 = Book.objects.create(
    
  47.             title="The book by Jane A",
    
  48.             editor=cls.editor_b,
    
  49.             author=cls.author2,
    
  50.         )
    
  51.         cls.book3 = Book.objects.create(
    
  52.             title="The book by Jane B",
    
  53.             editor=cls.editor_b,
    
  54.             author=cls.author2,
    
  55.         )
    
  56.         cls.book4 = Book.objects.create(
    
  57.             title="The book by Alice",
    
  58.             editor=cls.editor_a,
    
  59.             author=cls.author1,
    
  60.         )
    
  61.         cls.author1.favorite_books.add(cls.book2)
    
  62.         cls.author1.favorite_books.add(cls.book3)
    
  63. 
    
  64.     def test_select_related(self):
    
  65.         qs = (
    
  66.             Author.objects.annotate(
    
  67.                 book_join=FilteredRelation("book"),
    
  68.             )
    
  69.             .select_related("book_join__editor")
    
  70.             .order_by("pk", "book_join__pk")
    
  71.         )
    
  72.         with self.assertNumQueries(1):
    
  73.             self.assertQuerysetEqual(
    
  74.                 qs,
    
  75.                 [
    
  76.                     (self.author1, self.book1, self.editor_a, self.author1),
    
  77.                     (self.author1, self.book4, self.editor_a, self.author1),
    
  78.                     (self.author2, self.book2, self.editor_b, self.author2),
    
  79.                     (self.author2, self.book3, self.editor_b, self.author2),
    
  80.                 ],
    
  81.                 lambda x: (x, x.book_join, x.book_join.editor, x.book_join.author),
    
  82.             )
    
  83. 
    
  84.     def test_select_related_multiple(self):
    
  85.         qs = (
    
  86.             Book.objects.annotate(
    
  87.                 author_join=FilteredRelation("author"),
    
  88.                 editor_join=FilteredRelation("editor"),
    
  89.             )
    
  90.             .select_related("author_join", "editor_join")
    
  91.             .order_by("pk")
    
  92.         )
    
  93.         self.assertQuerysetEqual(
    
  94.             qs,
    
  95.             [
    
  96.                 (self.book1, self.author1, self.editor_a),
    
  97.                 (self.book2, self.author2, self.editor_b),
    
  98.                 (self.book3, self.author2, self.editor_b),
    
  99.                 (self.book4, self.author1, self.editor_a),
    
  100.             ],
    
  101.             lambda x: (x, x.author_join, x.editor_join),
    
  102.         )
    
  103. 
    
  104.     def test_select_related_with_empty_relation(self):
    
  105.         qs = (
    
  106.             Author.objects.annotate(
    
  107.                 book_join=FilteredRelation("book", condition=Q(pk=-1)),
    
  108.             )
    
  109.             .select_related("book_join")
    
  110.             .order_by("pk")
    
  111.         )
    
  112.         self.assertSequenceEqual(qs, [self.author1, self.author2])
    
  113. 
    
  114.     def test_select_related_foreign_key(self):
    
  115.         qs = (
    
  116.             Book.objects.annotate(
    
  117.                 author_join=FilteredRelation("author"),
    
  118.             )
    
  119.             .select_related("author_join")
    
  120.             .order_by("pk")
    
  121.         )
    
  122.         with self.assertNumQueries(1):
    
  123.             self.assertQuerysetEqual(
    
  124.                 qs,
    
  125.                 [
    
  126.                     (self.book1, self.author1),
    
  127.                     (self.book2, self.author2),
    
  128.                     (self.book3, self.author2),
    
  129.                     (self.book4, self.author1),
    
  130.                 ],
    
  131.                 lambda x: (x, x.author_join),
    
  132.             )
    
  133. 
    
  134.     @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
    
  135.     def test_select_related_foreign_key_for_update_of(self):
    
  136.         with transaction.atomic():
    
  137.             qs = (
    
  138.                 Book.objects.annotate(
    
  139.                     author_join=FilteredRelation("author"),
    
  140.                 )
    
  141.                 .select_related("author_join")
    
  142.                 .select_for_update(of=("self",))
    
  143.                 .order_by("pk")
    
  144.             )
    
  145.             with self.assertNumQueries(1):
    
  146.                 self.assertQuerysetEqual(
    
  147.                     qs,
    
  148.                     [
    
  149.                         (self.book1, self.author1),
    
  150.                         (self.book2, self.author2),
    
  151.                         (self.book3, self.author2),
    
  152.                         (self.book4, self.author1),
    
  153.                     ],
    
  154.                     lambda x: (x, x.author_join),
    
  155.                 )
    
  156. 
    
  157.     def test_without_join(self):
    
  158.         self.assertCountEqual(
    
  159.             Author.objects.annotate(
    
  160.                 book_alice=FilteredRelation(
    
  161.                     "book", condition=Q(book__title__iexact="poem by alice")
    
  162.                 ),
    
  163.             ),
    
  164.             [self.author1, self.author2],
    
  165.         )
    
  166. 
    
  167.     def test_with_join(self):
    
  168.         self.assertSequenceEqual(
    
  169.             Author.objects.annotate(
    
  170.                 book_alice=FilteredRelation(
    
  171.                     "book", condition=Q(book__title__iexact="poem by alice")
    
  172.                 ),
    
  173.             ).filter(book_alice__isnull=False),
    
  174.             [self.author1],
    
  175.         )
    
  176. 
    
  177.     def test_with_exclude(self):
    
  178.         self.assertSequenceEqual(
    
  179.             Author.objects.annotate(
    
  180.                 book_alice=FilteredRelation(
    
  181.                     "book", condition=Q(book__title__iexact="poem by alice")
    
  182.                 ),
    
  183.             ).exclude(book_alice__isnull=False),
    
  184.             [self.author2],
    
  185.         )
    
  186. 
    
  187.     def test_with_join_and_complex_condition(self):
    
  188.         self.assertSequenceEqual(
    
  189.             Author.objects.annotate(
    
  190.                 book_alice=FilteredRelation(
    
  191.                     "book",
    
  192.                     condition=Q(
    
  193.                         Q(book__title__iexact="poem by alice")
    
  194.                         | Q(book__state=Book.RENTED)
    
  195.                     ),
    
  196.                 ),
    
  197.             ).filter(book_alice__isnull=False),
    
  198.             [self.author1],
    
  199.         )
    
  200. 
    
  201.     def test_internal_queryset_alias_mapping(self):
    
  202.         queryset = Author.objects.annotate(
    
  203.             book_alice=FilteredRelation(
    
  204.                 "book", condition=Q(book__title__iexact="poem by alice")
    
  205.             ),
    
  206.         ).filter(book_alice__isnull=False)
    
  207.         self.assertIn(
    
  208.             "INNER JOIN {} book_alice ON".format(
    
  209.                 connection.ops.quote_name("filtered_relation_book")
    
  210.             ),
    
  211.             str(queryset.query),
    
  212.         )
    
  213. 
    
  214.     def test_multiple(self):
    
  215.         qs = (
    
  216.             Author.objects.annotate(
    
  217.                 book_title_alice=FilteredRelation(
    
  218.                     "book", condition=Q(book__title__contains="Alice")
    
  219.                 ),
    
  220.                 book_title_jane=FilteredRelation(
    
  221.                     "book", condition=Q(book__title__icontains="Jane")
    
  222.                 ),
    
  223.             )
    
  224.             .filter(name="Jane")
    
  225.             .values("book_title_alice__title", "book_title_jane__title")
    
  226.         )
    
  227.         empty = "" if connection.features.interprets_empty_strings_as_nulls else None
    
  228.         self.assertCountEqual(
    
  229.             qs,
    
  230.             [
    
  231.                 {
    
  232.                     "book_title_alice__title": empty,
    
  233.                     "book_title_jane__title": "The book by Jane A",
    
  234.                 },
    
  235.                 {
    
  236.                     "book_title_alice__title": empty,
    
  237.                     "book_title_jane__title": "The book by Jane B",
    
  238.                 },
    
  239.             ],
    
  240.         )
    
  241. 
    
  242.     def test_with_multiple_filter(self):
    
  243.         self.assertSequenceEqual(
    
  244.             Author.objects.annotate(
    
  245.                 book_editor_a=FilteredRelation(
    
  246.                     "book",
    
  247.                     condition=Q(
    
  248.                         book__title__icontains="book", book__editor_id=self.editor_a.pk
    
  249.                     ),
    
  250.                 ),
    
  251.             ).filter(book_editor_a__isnull=False),
    
  252.             [self.author1],
    
  253.         )
    
  254. 
    
  255.     def test_multiple_times(self):
    
  256.         self.assertSequenceEqual(
    
  257.             Author.objects.annotate(
    
  258.                 book_title_alice=FilteredRelation(
    
  259.                     "book", condition=Q(book__title__icontains="alice")
    
  260.                 ),
    
  261.             )
    
  262.             .filter(book_title_alice__isnull=False)
    
  263.             .filter(book_title_alice__isnull=False)
    
  264.             .distinct(),
    
  265.             [self.author1],
    
  266.         )
    
  267. 
    
  268.     def test_exclude_relation_with_join(self):
    
  269.         self.assertSequenceEqual(
    
  270.             Author.objects.annotate(
    
  271.                 book_alice=FilteredRelation(
    
  272.                     "book", condition=~Q(book__title__icontains="alice")
    
  273.                 ),
    
  274.             )
    
  275.             .filter(book_alice__isnull=False)
    
  276.             .distinct(),
    
  277.             [self.author2],
    
  278.         )
    
  279. 
    
  280.     def test_with_m2m(self):
    
  281.         qs = Author.objects.annotate(
    
  282.             favorite_books_written_by_jane=FilteredRelation(
    
  283.                 "favorite_books",
    
  284.                 condition=Q(favorite_books__in=[self.book2]),
    
  285.             ),
    
  286.         ).filter(favorite_books_written_by_jane__isnull=False)
    
  287.         self.assertSequenceEqual(qs, [self.author1])
    
  288. 
    
  289.     def test_with_m2m_deep(self):
    
  290.         qs = Author.objects.annotate(
    
  291.             favorite_books_written_by_jane=FilteredRelation(
    
  292.                 "favorite_books",
    
  293.                 condition=Q(favorite_books__author=self.author2),
    
  294.             ),
    
  295.         ).filter(favorite_books_written_by_jane__title="The book by Jane B")
    
  296.         self.assertSequenceEqual(qs, [self.author1])
    
  297. 
    
  298.     def test_with_m2m_multijoin(self):
    
  299.         qs = (
    
  300.             Author.objects.annotate(
    
  301.                 favorite_books_written_by_jane=FilteredRelation(
    
  302.                     "favorite_books",
    
  303.                     condition=Q(favorite_books__author=self.author2),
    
  304.                 )
    
  305.             )
    
  306.             .filter(favorite_books_written_by_jane__editor__name="b")
    
  307.             .distinct()
    
  308.         )
    
  309.         self.assertSequenceEqual(qs, [self.author1])
    
  310. 
    
  311.     def test_values_list(self):
    
  312.         self.assertSequenceEqual(
    
  313.             Author.objects.annotate(
    
  314.                 book_alice=FilteredRelation(
    
  315.                     "book", condition=Q(book__title__iexact="poem by alice")
    
  316.                 ),
    
  317.             )
    
  318.             .filter(book_alice__isnull=False)
    
  319.             .values_list("book_alice__title", flat=True),
    
  320.             ["Poem by Alice"],
    
  321.         )
    
  322. 
    
  323.     def test_values(self):
    
  324.         self.assertSequenceEqual(
    
  325.             Author.objects.annotate(
    
  326.                 book_alice=FilteredRelation(
    
  327.                     "book", condition=Q(book__title__iexact="poem by alice")
    
  328.                 ),
    
  329.             )
    
  330.             .filter(book_alice__isnull=False)
    
  331.             .values(),
    
  332.             [
    
  333.                 {
    
  334.                     "id": self.author1.pk,
    
  335.                     "name": "Alice",
    
  336.                     "content_type_id": None,
    
  337.                     "object_id": None,
    
  338.                 }
    
  339.             ],
    
  340.         )
    
  341. 
    
  342.     def test_extra(self):
    
  343.         self.assertSequenceEqual(
    
  344.             Author.objects.annotate(
    
  345.                 book_alice=FilteredRelation(
    
  346.                     "book", condition=Q(book__title__iexact="poem by alice")
    
  347.                 ),
    
  348.             )
    
  349.             .filter(book_alice__isnull=False)
    
  350.             .extra(where=["1 = 1"]),
    
  351.             [self.author1],
    
  352.         )
    
  353. 
    
  354.     @skipUnlessDBFeature("supports_select_union")
    
  355.     def test_union(self):
    
  356.         qs1 = Author.objects.annotate(
    
  357.             book_alice=FilteredRelation(
    
  358.                 "book", condition=Q(book__title__iexact="poem by alice")
    
  359.             ),
    
  360.         ).filter(book_alice__isnull=False)
    
  361.         qs2 = Author.objects.annotate(
    
  362.             book_jane=FilteredRelation(
    
  363.                 "book", condition=Q(book__title__iexact="the book by jane a")
    
  364.             ),
    
  365.         ).filter(book_jane__isnull=False)
    
  366.         self.assertSequenceEqual(qs1.union(qs2), [self.author1, self.author2])
    
  367. 
    
  368.     @skipUnlessDBFeature("supports_select_intersection")
    
  369.     def test_intersection(self):
    
  370.         qs1 = Author.objects.annotate(
    
  371.             book_alice=FilteredRelation(
    
  372.                 "book", condition=Q(book__title__iexact="poem by alice")
    
  373.             ),
    
  374.         ).filter(book_alice__isnull=False)
    
  375.         qs2 = Author.objects.annotate(
    
  376.             book_jane=FilteredRelation(
    
  377.                 "book", condition=Q(book__title__iexact="the book by jane a")
    
  378.             ),
    
  379.         ).filter(book_jane__isnull=False)
    
  380.         self.assertSequenceEqual(qs1.intersection(qs2), [])
    
  381. 
    
  382.     @skipUnlessDBFeature("supports_select_difference")
    
  383.     def test_difference(self):
    
  384.         qs1 = Author.objects.annotate(
    
  385.             book_alice=FilteredRelation(
    
  386.                 "book", condition=Q(book__title__iexact="poem by alice")
    
  387.             ),
    
  388.         ).filter(book_alice__isnull=False)
    
  389.         qs2 = Author.objects.annotate(
    
  390.             book_jane=FilteredRelation(
    
  391.                 "book", condition=Q(book__title__iexact="the book by jane a")
    
  392.             ),
    
  393.         ).filter(book_jane__isnull=False)
    
  394.         self.assertSequenceEqual(qs1.difference(qs2), [self.author1])
    
  395. 
    
  396.     def test_select_for_update(self):
    
  397.         self.assertSequenceEqual(
    
  398.             Author.objects.annotate(
    
  399.                 book_jane=FilteredRelation(
    
  400.                     "book", condition=Q(book__title__iexact="the book by jane a")
    
  401.                 ),
    
  402.             )
    
  403.             .filter(book_jane__isnull=False)
    
  404.             .select_for_update(),
    
  405.             [self.author2],
    
  406.         )
    
  407. 
    
  408.     def test_defer(self):
    
  409.         # One query for the list and one query for the deferred title.
    
  410.         with self.assertNumQueries(2):
    
  411.             self.assertQuerysetEqual(
    
  412.                 Author.objects.annotate(
    
  413.                     book_alice=FilteredRelation(
    
  414.                         "book", condition=Q(book__title__iexact="poem by alice")
    
  415.                     ),
    
  416.                 )
    
  417.                 .filter(book_alice__isnull=False)
    
  418.                 .select_related("book_alice")
    
  419.                 .defer("book_alice__title"),
    
  420.                 ["Poem by Alice"],
    
  421.                 lambda author: author.book_alice.title,
    
  422.             )
    
  423. 
    
  424.     def test_only_not_supported(self):
    
  425.         msg = "only() is not supported with FilteredRelation."
    
  426.         with self.assertRaisesMessage(ValueError, msg):
    
  427.             Author.objects.annotate(
    
  428.                 book_alice=FilteredRelation(
    
  429.                     "book", condition=Q(book__title__iexact="poem by alice")
    
  430.                 ),
    
  431.             ).filter(book_alice__isnull=False).select_related("book_alice").only(
    
  432.                 "book_alice__state"
    
  433.             )
    
  434. 
    
  435.     def test_as_subquery(self):
    
  436.         inner_qs = Author.objects.annotate(
    
  437.             book_alice=FilteredRelation(
    
  438.                 "book", condition=Q(book__title__iexact="poem by alice")
    
  439.             ),
    
  440.         ).filter(book_alice__isnull=False)
    
  441.         qs = Author.objects.filter(id__in=inner_qs)
    
  442.         self.assertSequenceEqual(qs, [self.author1])
    
  443. 
    
  444.     def test_nested_foreign_key(self):
    
  445.         qs = (
    
  446.             Author.objects.annotate(
    
  447.                 book_editor_worked_with=FilteredRelation(
    
  448.                     "book__editor",
    
  449.                     condition=Q(book__title__icontains="book by"),
    
  450.                 ),
    
  451.             )
    
  452.             .filter(
    
  453.                 book_editor_worked_with__isnull=False,
    
  454.             )
    
  455.             .select_related(
    
  456.                 "book_editor_worked_with",
    
  457.             )
    
  458.             .order_by("pk", "book_editor_worked_with__pk")
    
  459.         )
    
  460.         with self.assertNumQueries(1):
    
  461.             self.assertQuerysetEqual(
    
  462.                 qs,
    
  463.                 [
    
  464.                     (self.author1, self.editor_a),
    
  465.                     (self.author2, self.editor_b),
    
  466.                     (self.author2, self.editor_b),
    
  467.                 ],
    
  468.                 lambda x: (x, x.book_editor_worked_with),
    
  469.             )
    
  470. 
    
  471.     def test_nested_foreign_key_nested_field(self):
    
  472.         qs = (
    
  473.             Author.objects.annotate(
    
  474.                 book_editor_worked_with=FilteredRelation(
    
  475.                     "book__editor", condition=Q(book__title__icontains="book by")
    
  476.                 ),
    
  477.             )
    
  478.             .filter(
    
  479.                 book_editor_worked_with__isnull=False,
    
  480.             )
    
  481.             .values(
    
  482.                 "name",
    
  483.                 "book_editor_worked_with__name",
    
  484.             )
    
  485.             .order_by("name", "book_editor_worked_with__name")
    
  486.             .distinct()
    
  487.         )
    
  488.         self.assertSequenceEqual(
    
  489.             qs,
    
  490.             [
    
  491.                 {
    
  492.                     "name": self.author1.name,
    
  493.                     "book_editor_worked_with__name": self.editor_a.name,
    
  494.                 },
    
  495.                 {
    
  496.                     "name": self.author2.name,
    
  497.                     "book_editor_worked_with__name": self.editor_b.name,
    
  498.                 },
    
  499.             ],
    
  500.         )
    
  501. 
    
  502.     def test_nested_foreign_key_filtered_base_object(self):
    
  503.         qs = (
    
  504.             Author.objects.annotate(
    
  505.                 alice_editors=FilteredRelation(
    
  506.                     "book__editor",
    
  507.                     condition=Q(name="Alice"),
    
  508.                 ),
    
  509.             )
    
  510.             .values(
    
  511.                 "name",
    
  512.                 "alice_editors__pk",
    
  513.             )
    
  514.             .order_by("name", "alice_editors__name")
    
  515.             .distinct()
    
  516.         )
    
  517.         self.assertSequenceEqual(
    
  518.             qs,
    
  519.             [
    
  520.                 {"name": self.author1.name, "alice_editors__pk": self.editor_a.pk},
    
  521.                 {"name": self.author2.name, "alice_editors__pk": None},
    
  522.             ],
    
  523.         )
    
  524. 
    
  525.     def test_nested_m2m_filtered(self):
    
  526.         qs = (
    
  527.             Book.objects.annotate(
    
  528.                 favorite_book=FilteredRelation(
    
  529.                     "author__favorite_books",
    
  530.                     condition=Q(author__favorite_books__title__icontains="book by"),
    
  531.                 ),
    
  532.             )
    
  533.             .values(
    
  534.                 "title",
    
  535.                 "favorite_book__pk",
    
  536.             )
    
  537.             .order_by("title", "favorite_book__title")
    
  538.         )
    
  539.         self.assertSequenceEqual(
    
  540.             qs,
    
  541.             [
    
  542.                 {"title": self.book1.title, "favorite_book__pk": self.book2.pk},
    
  543.                 {"title": self.book1.title, "favorite_book__pk": self.book3.pk},
    
  544.                 {"title": self.book4.title, "favorite_book__pk": self.book2.pk},
    
  545.                 {"title": self.book4.title, "favorite_book__pk": self.book3.pk},
    
  546.                 {"title": self.book2.title, "favorite_book__pk": None},
    
  547.                 {"title": self.book3.title, "favorite_book__pk": None},
    
  548.             ],
    
  549.         )
    
  550. 
    
  551.     def test_nested_chained_relations(self):
    
  552.         qs = (
    
  553.             Author.objects.annotate(
    
  554.                 my_books=FilteredRelation(
    
  555.                     "book",
    
  556.                     condition=Q(book__title__icontains="book by"),
    
  557.                 ),
    
  558.                 preferred_by_authors=FilteredRelation(
    
  559.                     "my_books__preferred_by_authors",
    
  560.                     condition=Q(my_books__preferred_by_authors__name="Alice"),
    
  561.                 ),
    
  562.             )
    
  563.             .annotate(
    
  564.                 author=F("name"),
    
  565.                 book_title=F("my_books__title"),
    
  566.                 preferred_by_author_pk=F("preferred_by_authors"),
    
  567.             )
    
  568.             .order_by("author", "book_title", "preferred_by_author_pk")
    
  569.         )
    
  570.         self.assertQuerysetEqual(
    
  571.             qs,
    
  572.             [
    
  573.                 ("Alice", "The book by Alice", None),
    
  574.                 ("Jane", "The book by Jane A", self.author1.pk),
    
  575.                 ("Jane", "The book by Jane B", self.author1.pk),
    
  576.             ],
    
  577.             lambda x: (x.author, x.book_title, x.preferred_by_author_pk),
    
  578.         )
    
  579. 
    
  580.     def test_deep_nested_foreign_key(self):
    
  581.         qs = (
    
  582.             Book.objects.annotate(
    
  583.                 author_favorite_book_editor=FilteredRelation(
    
  584.                     "author__favorite_books__editor",
    
  585.                     condition=Q(author__favorite_books__title__icontains="Jane A"),
    
  586.                 ),
    
  587.             )
    
  588.             .filter(
    
  589.                 author_favorite_book_editor__isnull=False,
    
  590.             )
    
  591.             .select_related(
    
  592.                 "author_favorite_book_editor",
    
  593.             )
    
  594.             .order_by("pk", "author_favorite_book_editor__pk")
    
  595.         )
    
  596.         with self.assertNumQueries(1):
    
  597.             self.assertQuerysetEqual(
    
  598.                 qs,
    
  599.                 [
    
  600.                     (self.book1, self.editor_b),
    
  601.                     (self.book4, self.editor_b),
    
  602.                 ],
    
  603.                 lambda x: (x, x.author_favorite_book_editor),
    
  604.             )
    
  605. 
    
  606.     def test_relation_name_lookup(self):
    
  607.         msg = (
    
  608.             "FilteredRelation's relation_name cannot contain lookups (got "
    
  609.             "'book__title__icontains')."
    
  610.         )
    
  611.         with self.assertRaisesMessage(ValueError, msg):
    
  612.             Author.objects.annotate(
    
  613.                 book_title=FilteredRelation(
    
  614.                     "book__title__icontains",
    
  615.                     condition=Q(book__title="Poem by Alice"),
    
  616.                 ),
    
  617.             )
    
  618. 
    
  619.     def test_condition_outside_relation_name(self):
    
  620.         msg = (
    
  621.             "FilteredRelation's condition doesn't support relations outside "
    
  622.             "the 'book__editor' (got 'book__author__name__icontains')."
    
  623.         )
    
  624.         with self.assertRaisesMessage(ValueError, msg):
    
  625.             Author.objects.annotate(
    
  626.                 book_editor=FilteredRelation(
    
  627.                     "book__editor",
    
  628.                     condition=Q(book__author__name__icontains="book"),
    
  629.                 ),
    
  630.             )
    
  631. 
    
  632.     def test_condition_deeper_relation_name(self):
    
  633.         msg = (
    
  634.             "FilteredRelation's condition doesn't support nested relations "
    
  635.             "deeper than the relation_name (got "
    
  636.             "'book__editor__name__icontains' for 'book')."
    
  637.         )
    
  638.         with self.assertRaisesMessage(ValueError, msg):
    
  639.             Author.objects.annotate(
    
  640.                 book_editor=FilteredRelation(
    
  641.                     "book",
    
  642.                     condition=Q(book__editor__name__icontains="b"),
    
  643.                 ),
    
  644.             )
    
  645. 
    
  646.     def test_with_empty_relation_name_error(self):
    
  647.         with self.assertRaisesMessage(ValueError, "relation_name cannot be empty."):
    
  648.             FilteredRelation("", condition=Q(blank=""))
    
  649. 
    
  650.     def test_with_condition_as_expression_error(self):
    
  651.         msg = "condition argument must be a Q() instance."
    
  652.         expression = Case(
    
  653.             When(book__title__iexact="poem by alice", then=True),
    
  654.             default=False,
    
  655.         )
    
  656.         with self.assertRaisesMessage(ValueError, msg):
    
  657.             FilteredRelation("book", condition=expression)
    
  658. 
    
  659.     def test_with_prefetch_related(self):
    
  660.         msg = "prefetch_related() is not supported with FilteredRelation."
    
  661.         qs = Author.objects.annotate(
    
  662.             book_title_contains_b=FilteredRelation(
    
  663.                 "book", condition=Q(book__title__icontains="b")
    
  664.             ),
    
  665.         ).filter(
    
  666.             book_title_contains_b__isnull=False,
    
  667.         )
    
  668.         with self.assertRaisesMessage(ValueError, msg):
    
  669.             qs.prefetch_related("book_title_contains_b")
    
  670.         with self.assertRaisesMessage(ValueError, msg):
    
  671.             qs.prefetch_related("book_title_contains_b__editor")
    
  672. 
    
  673.     def test_with_generic_foreign_key(self):
    
  674.         self.assertSequenceEqual(
    
  675.             Book.objects.annotate(
    
  676.                 generic_authored_book=FilteredRelation(
    
  677.                     "generic_author", condition=Q(generic_author__isnull=False)
    
  678.                 ),
    
  679.             ).filter(generic_authored_book__isnull=False),
    
  680.             [self.book1],
    
  681.         )
    
  682. 
    
  683.     def test_eq(self):
    
  684.         self.assertEqual(
    
  685.             FilteredRelation("book", condition=Q(book__title="b")), mock.ANY
    
  686.         )
    
  687. 
    
  688. 
    
  689. class FilteredRelationAggregationTests(TestCase):
    
  690.     @classmethod
    
  691.     def setUpTestData(cls):
    
  692.         cls.author1 = Author.objects.create(name="Alice")
    
  693.         cls.editor_a = Editor.objects.create(name="a")
    
  694.         cls.book1 = Book.objects.create(
    
  695.             title="Poem by Alice",
    
  696.             editor=cls.editor_a,
    
  697.             author=cls.author1,
    
  698.         )
    
  699.         cls.borrower1 = Borrower.objects.create(name="Jenny")
    
  700.         cls.borrower2 = Borrower.objects.create(name="Kevin")
    
  701.         # borrower 1 reserves, rents, and returns book1.
    
  702.         Reservation.objects.create(
    
  703.             borrower=cls.borrower1,
    
  704.             book=cls.book1,
    
  705.             state=Reservation.STOPPED,
    
  706.         )
    
  707.         RentalSession.objects.create(
    
  708.             borrower=cls.borrower1,
    
  709.             book=cls.book1,
    
  710.             state=RentalSession.STOPPED,
    
  711.         )
    
  712.         # borrower2 reserves, rents, and returns book1.
    
  713.         Reservation.objects.create(
    
  714.             borrower=cls.borrower2,
    
  715.             book=cls.book1,
    
  716.             state=Reservation.STOPPED,
    
  717.         )
    
  718.         RentalSession.objects.create(
    
  719.             borrower=cls.borrower2,
    
  720.             book=cls.book1,
    
  721.             state=RentalSession.STOPPED,
    
  722.         )
    
  723. 
    
  724.     def test_aggregate(self):
    
  725.         """
    
  726.         filtered_relation() not only improves performance but also creates
    
  727.         correct results when aggregating with multiple LEFT JOINs.
    
  728. 
    
  729.         Books can be reserved then rented by a borrower. Each reservation and
    
  730.         rental session are recorded with Reservation and RentalSession models.
    
  731.         Every time a reservation or a rental session is over, their state is
    
  732.         changed to 'stopped'.
    
  733. 
    
  734.         Goal: Count number of books that are either currently reserved or
    
  735.         rented by borrower1 or available.
    
  736.         """
    
  737.         qs = (
    
  738.             Book.objects.annotate(
    
  739.                 is_reserved_or_rented_by=Case(
    
  740.                     When(
    
  741.                         reservation__state=Reservation.NEW,
    
  742.                         then=F("reservation__borrower__pk"),
    
  743.                     ),
    
  744.                     When(
    
  745.                         rental_session__state=RentalSession.NEW,
    
  746.                         then=F("rental_session__borrower__pk"),
    
  747.                     ),
    
  748.                     default=None,
    
  749.                 )
    
  750.             )
    
  751.             .filter(
    
  752.                 Q(is_reserved_or_rented_by=self.borrower1.pk) | Q(state=Book.AVAILABLE)
    
  753.             )
    
  754.             .distinct()
    
  755.         )
    
  756.         self.assertEqual(qs.count(), 1)
    
  757.         # If count is equal to 1, the same aggregation should return in the
    
  758.         # same result but it returns 4.
    
  759.         self.assertSequenceEqual(
    
  760.             qs.annotate(total=Count("pk")).values("total"), [{"total": 4}]
    
  761.         )
    
  762.         # With FilteredRelation, the result is as expected (1).
    
  763.         qs = (
    
  764.             Book.objects.annotate(
    
  765.                 active_reservations=FilteredRelation(
    
  766.                     "reservation",
    
  767.                     condition=Q(
    
  768.                         reservation__state=Reservation.NEW,
    
  769.                         reservation__borrower=self.borrower1,
    
  770.                     ),
    
  771.                 ),
    
  772.             )
    
  773.             .annotate(
    
  774.                 active_rental_sessions=FilteredRelation(
    
  775.                     "rental_session",
    
  776.                     condition=Q(
    
  777.                         rental_session__state=RentalSession.NEW,
    
  778.                         rental_session__borrower=self.borrower1,
    
  779.                     ),
    
  780.                 ),
    
  781.             )
    
  782.             .filter(
    
  783.                 (
    
  784.                     Q(active_reservations__isnull=False)
    
  785.                     | Q(active_rental_sessions__isnull=False)
    
  786.                 )
    
  787.                 | Q(state=Book.AVAILABLE)
    
  788.             )
    
  789.             .distinct()
    
  790.         )
    
  791.         self.assertEqual(qs.count(), 1)
    
  792.         self.assertSequenceEqual(
    
  793.             qs.annotate(total=Count("pk")).values("total"), [{"total": 1}]
    
  794.         )
    
  795. 
    
  796. 
    
  797. class FilteredRelationAnalyticalAggregationTests(TestCase):
    
  798.     @classmethod
    
  799.     def setUpTestData(cls):
    
  800.         author = Author.objects.create(name="Author")
    
  801.         editor = Editor.objects.create(name="Editor")
    
  802.         cls.book1 = Book.objects.create(
    
  803.             title="Poem by Alice",
    
  804.             editor=editor,
    
  805.             author=author,
    
  806.         )
    
  807.         cls.book2 = Book.objects.create(
    
  808.             title="The book by Jane A",
    
  809.             editor=editor,
    
  810.             author=author,
    
  811.         )
    
  812.         cls.book3 = Book.objects.create(
    
  813.             title="The book by Jane B",
    
  814.             editor=editor,
    
  815.             author=author,
    
  816.         )
    
  817.         cls.seller1 = Seller.objects.create(name="Seller 1")
    
  818.         cls.seller2 = Seller.objects.create(name="Seller 2")
    
  819.         cls.usd = Currency.objects.create(currency="USD")
    
  820.         cls.eur = Currency.objects.create(currency="EUR")
    
  821.         cls.sales_date1 = date(2020, 7, 6)
    
  822.         cls.sales_date2 = date(2020, 7, 7)
    
  823.         ExchangeRate.objects.bulk_create(
    
  824.             [
    
  825.                 ExchangeRate(
    
  826.                     rate_date=cls.sales_date1,
    
  827.                     from_currency=cls.usd,
    
  828.                     to_currency=cls.eur,
    
  829.                     rate=0.40,
    
  830.                 ),
    
  831.                 ExchangeRate(
    
  832.                     rate_date=cls.sales_date1,
    
  833.                     from_currency=cls.eur,
    
  834.                     to_currency=cls.usd,
    
  835.                     rate=1.60,
    
  836.                 ),
    
  837.                 ExchangeRate(
    
  838.                     rate_date=cls.sales_date2,
    
  839.                     from_currency=cls.usd,
    
  840.                     to_currency=cls.eur,
    
  841.                     rate=0.50,
    
  842.                 ),
    
  843.                 ExchangeRate(
    
  844.                     rate_date=cls.sales_date2,
    
  845.                     from_currency=cls.eur,
    
  846.                     to_currency=cls.usd,
    
  847.                     rate=1.50,
    
  848.                 ),
    
  849.                 ExchangeRate(
    
  850.                     rate_date=cls.sales_date2,
    
  851.                     from_currency=cls.usd,
    
  852.                     to_currency=cls.usd,
    
  853.                     rate=1.00,
    
  854.                 ),
    
  855.             ]
    
  856.         )
    
  857.         BookDailySales.objects.bulk_create(
    
  858.             [
    
  859.                 BookDailySales(
    
  860.                     book=cls.book1,
    
  861.                     sale_date=cls.sales_date1,
    
  862.                     currency=cls.usd,
    
  863.                     sales=100.00,
    
  864.                     seller=cls.seller1,
    
  865.                 ),
    
  866.                 BookDailySales(
    
  867.                     book=cls.book2,
    
  868.                     sale_date=cls.sales_date1,
    
  869.                     currency=cls.eur,
    
  870.                     sales=200.00,
    
  871.                     seller=cls.seller1,
    
  872.                 ),
    
  873.                 BookDailySales(
    
  874.                     book=cls.book1,
    
  875.                     sale_date=cls.sales_date2,
    
  876.                     currency=cls.usd,
    
  877.                     sales=50.00,
    
  878.                     seller=cls.seller2,
    
  879.                 ),
    
  880.                 BookDailySales(
    
  881.                     book=cls.book2,
    
  882.                     sale_date=cls.sales_date2,
    
  883.                     currency=cls.eur,
    
  884.                     sales=100.00,
    
  885.                     seller=cls.seller2,
    
  886.                 ),
    
  887.             ]
    
  888.         )
    
  889. 
    
  890.     def test_aggregate(self):
    
  891.         tests = [
    
  892.             Q(daily_sales__sale_date__gte=self.sales_date2),
    
  893.             ~Q(daily_sales__seller=self.seller1),
    
  894.         ]
    
  895.         for condition in tests:
    
  896.             with self.subTest(condition=condition):
    
  897.                 qs = (
    
  898.                     Book.objects.annotate(
    
  899.                         recent_sales=FilteredRelation(
    
  900.                             "daily_sales", condition=condition
    
  901.                         ),
    
  902.                         recent_sales_rates=FilteredRelation(
    
  903.                             "recent_sales__currency__rates_from",
    
  904.                             condition=Q(
    
  905.                                 recent_sales__currency__rates_from__rate_date=F(
    
  906.                                     "recent_sales__sale_date"
    
  907.                                 ),
    
  908.                                 recent_sales__currency__rates_from__to_currency=(
    
  909.                                     self.usd
    
  910.                                 ),
    
  911.                             ),
    
  912.                         ),
    
  913.                     )
    
  914.                     .annotate(
    
  915.                         sales_sum=Sum(
    
  916.                             F("recent_sales__sales") * F("recent_sales_rates__rate"),
    
  917.                             output_field=DecimalField(),
    
  918.                         ),
    
  919.                     )
    
  920.                     .values("title", "sales_sum")
    
  921.                     .order_by(
    
  922.                         F("sales_sum").desc(nulls_last=True),
    
  923.                     )
    
  924.                 )
    
  925.                 self.assertSequenceEqual(
    
  926.                     qs,
    
  927.                     [
    
  928.                         {"title": self.book2.title, "sales_sum": Decimal(150.00)},
    
  929.                         {"title": self.book1.title, "sales_sum": Decimal(50.00)},
    
  930.                         {"title": self.book3.title, "sales_sum": None},
    
  931.                     ],
    
  932.                 )