from datetime import date
from decimal import Decimal
from unittest import mock
from django.db import connection, transaction
from django.db.models import (
Case,Count,DecimalField,F,FilteredRelation,Q,Sum,When,)from django.test import TestCase
from django.test.testcases import skipUnlessDBFeature
from .models import (
Author,Book,BookDailySales,Borrower,Currency,Editor,ExchangeRate,RentalSession,Reservation,Seller,)class FilteredRelationTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.author1 = Author.objects.create(name="Alice")
cls.author2 = Author.objects.create(name="Jane")
cls.editor_a = Editor.objects.create(name="a")
cls.editor_b = Editor.objects.create(name="b")
cls.book1 = Book.objects.create(
title="Poem by Alice",
editor=cls.editor_a,
author=cls.author1,
)cls.book1.generic_author.set([cls.author2])
cls.book2 = Book.objects.create(
title="The book by Jane A",
editor=cls.editor_b,
author=cls.author2,
)cls.book3 = Book.objects.create(
title="The book by Jane B",
editor=cls.editor_b,
author=cls.author2,
)cls.book4 = Book.objects.create(
title="The book by Alice",
editor=cls.editor_a,
author=cls.author1,
)cls.author1.favorite_books.add(cls.book2)
cls.author1.favorite_books.add(cls.book3)
def test_select_related(self):
qs = (Author.objects.annotate(
book_join=FilteredRelation("book"),
).select_related("book_join__editor")
.order_by("pk", "book_join__pk")
)with self.assertNumQueries(1):
self.assertQuerysetEqual(
qs,[(self.author1, self.book1, self.editor_a, self.author1),
(self.author1, self.book4, self.editor_a, self.author1),
(self.author2, self.book2, self.editor_b, self.author2),
(self.author2, self.book3, self.editor_b, self.author2),
],lambda x: (x, x.book_join, x.book_join.editor, x.book_join.author),
)def test_select_related_multiple(self):
qs = (Book.objects.annotate(
author_join=FilteredRelation("author"),
editor_join=FilteredRelation("editor"),
).select_related("author_join", "editor_join")
.order_by("pk")
)self.assertQuerysetEqual(
qs,[(self.book1, self.author1, self.editor_a),
(self.book2, self.author2, self.editor_b),
(self.book3, self.author2, self.editor_b),
(self.book4, self.author1, self.editor_a),
],lambda x: (x, x.author_join, x.editor_join),
)def test_select_related_with_empty_relation(self):
qs = (Author.objects.annotate(
book_join=FilteredRelation("book", condition=Q(pk=-1)),
).select_related("book_join")
.order_by("pk")
)self.assertSequenceEqual(qs, [self.author1, self.author2])
def test_select_related_foreign_key(self):
qs = (Book.objects.annotate(
author_join=FilteredRelation("author"),
).select_related("author_join")
.order_by("pk")
)with self.assertNumQueries(1):
self.assertQuerysetEqual(
qs,[(self.book1, self.author1),
(self.book2, self.author2),
(self.book3, self.author2),
(self.book4, self.author1),
],lambda x: (x, x.author_join),
)@skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
def test_select_related_foreign_key_for_update_of(self):
with transaction.atomic():
qs = (Book.objects.annotate(
author_join=FilteredRelation("author"),
).select_related("author_join")
.select_for_update(of=("self",))
.order_by("pk")
)with self.assertNumQueries(1):
self.assertQuerysetEqual(
qs,[(self.book1, self.author1),
(self.book2, self.author2),
(self.book3, self.author2),
(self.book4, self.author1),
],lambda x: (x, x.author_join),
)def test_without_join(self):
self.assertCountEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),),[self.author1, self.author2],
)def test_with_join(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False),
[self.author1],
)def test_with_exclude(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).exclude(book_alice__isnull=False),
[self.author2],
)def test_with_join_and_complex_condition(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book",
condition=Q(
Q(book__title__iexact="poem by alice")
| Q(book__state=Book.RENTED)
),),).filter(book_alice__isnull=False),
[self.author1],
)def test_internal_queryset_alias_mapping(self):
queryset = Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
self.assertIn(
"INNER JOIN {} book_alice ON".format(
connection.ops.quote_name("filtered_relation_book")
),str(queryset.query),)def test_multiple(self):
qs = (Author.objects.annotate(
book_title_alice=FilteredRelation(
"book", condition=Q(book__title__contains="Alice")
),book_title_jane=FilteredRelation(
"book", condition=Q(book__title__icontains="Jane")
),).filter(name="Jane")
.values("book_title_alice__title", "book_title_jane__title")
)empty = "" if connection.features.interprets_empty_strings_as_nulls else None
self.assertCountEqual(
qs,[{"book_title_alice__title": empty,
"book_title_jane__title": "The book by Jane A",
},{"book_title_alice__title": empty,
"book_title_jane__title": "The book by Jane B",
},],)def test_with_multiple_filter(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_editor_a=FilteredRelation(
"book",
condition=Q(
book__title__icontains="book", book__editor_id=self.editor_a.pk
),),).filter(book_editor_a__isnull=False),
[self.author1],
)def test_multiple_times(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_title_alice=FilteredRelation(
"book", condition=Q(book__title__icontains="alice")
),).filter(book_title_alice__isnull=False)
.filter(book_title_alice__isnull=False)
.distinct(),
[self.author1],
)def test_exclude_relation_with_join(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=~Q(book__title__icontains="alice")
),).filter(book_alice__isnull=False)
.distinct(),
[self.author2],
)def test_with_m2m(self):
qs = Author.objects.annotate(
favorite_books_written_by_jane=FilteredRelation(
"favorite_books",
condition=Q(favorite_books__in=[self.book2]),
),).filter(favorite_books_written_by_jane__isnull=False)
self.assertSequenceEqual(qs, [self.author1])
def test_with_m2m_deep(self):
qs = Author.objects.annotate(
favorite_books_written_by_jane=FilteredRelation(
"favorite_books",
condition=Q(favorite_books__author=self.author2),
),).filter(favorite_books_written_by_jane__title="The book by Jane B")
self.assertSequenceEqual(qs, [self.author1])
def test_with_m2m_multijoin(self):
qs = (Author.objects.annotate(
favorite_books_written_by_jane=FilteredRelation(
"favorite_books",
condition=Q(favorite_books__author=self.author2),
)).filter(favorite_books_written_by_jane__editor__name="b")
.distinct()
)self.assertSequenceEqual(qs, [self.author1])
def test_values_list(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
.values_list("book_alice__title", flat=True),
["Poem by Alice"],
)def test_values(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
.values(),
[{"id": self.author1.pk,
"name": "Alice",
"content_type_id": None,
"object_id": None,
}],)def test_extra(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
.extra(where=["1 = 1"]),
[self.author1],
)@skipUnlessDBFeature("supports_select_union")
def test_union(self):
qs1 = Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
qs2 = Author.objects.annotate(
book_jane=FilteredRelation(
"book", condition=Q(book__title__iexact="the book by jane a")
),).filter(book_jane__isnull=False)
self.assertSequenceEqual(qs1.union(qs2), [self.author1, self.author2])
@skipUnlessDBFeature("supports_select_intersection")
def test_intersection(self):
qs1 = Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
qs2 = Author.objects.annotate(
book_jane=FilteredRelation(
"book", condition=Q(book__title__iexact="the book by jane a")
),).filter(book_jane__isnull=False)
self.assertSequenceEqual(qs1.intersection(qs2), [])
@skipUnlessDBFeature("supports_select_difference")
def test_difference(self):
qs1 = Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
qs2 = Author.objects.annotate(
book_jane=FilteredRelation(
"book", condition=Q(book__title__iexact="the book by jane a")
),).filter(book_jane__isnull=False)
self.assertSequenceEqual(qs1.difference(qs2), [self.author1])
def test_select_for_update(self):
self.assertSequenceEqual(
Author.objects.annotate(
book_jane=FilteredRelation(
"book", condition=Q(book__title__iexact="the book by jane a")
),).filter(book_jane__isnull=False)
.select_for_update(),
[self.author2],
)def test_defer(self):
# One query for the list and one query for the deferred title.
with self.assertNumQueries(2):
self.assertQuerysetEqual(
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
.select_related("book_alice")
.defer("book_alice__title"),
["Poem by Alice"],
lambda author: author.book_alice.title,
)def test_only_not_supported(self):
msg = "only() is not supported with FilteredRelation."
with self.assertRaisesMessage(ValueError, msg):
Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False).select_related("book_alice").only(
"book_alice__state"
)def test_as_subquery(self):
inner_qs = Author.objects.annotate(
book_alice=FilteredRelation(
"book", condition=Q(book__title__iexact="poem by alice")
),).filter(book_alice__isnull=False)
qs = Author.objects.filter(id__in=inner_qs)
self.assertSequenceEqual(qs, [self.author1])
def test_nested_foreign_key(self):
qs = (Author.objects.annotate(
book_editor_worked_with=FilteredRelation(
"book__editor",
condition=Q(book__title__icontains="book by"),
),).filter(
book_editor_worked_with__isnull=False,
).select_related(
"book_editor_worked_with",
).order_by("pk", "book_editor_worked_with__pk")
)with self.assertNumQueries(1):
self.assertQuerysetEqual(
qs,[(self.author1, self.editor_a),
(self.author2, self.editor_b),
(self.author2, self.editor_b),
],lambda x: (x, x.book_editor_worked_with),
)def test_nested_foreign_key_nested_field(self):
qs = (Author.objects.annotate(
book_editor_worked_with=FilteredRelation(
"book__editor", condition=Q(book__title__icontains="book by")
),).filter(
book_editor_worked_with__isnull=False,
).values(
"name",
"book_editor_worked_with__name",
).order_by("name", "book_editor_worked_with__name")
.distinct()
)self.assertSequenceEqual(
qs,[{"name": self.author1.name,
"book_editor_worked_with__name": self.editor_a.name,
},{"name": self.author2.name,
"book_editor_worked_with__name": self.editor_b.name,
},],)def test_nested_foreign_key_filtered_base_object(self):
qs = (Author.objects.annotate(
alice_editors=FilteredRelation(
"book__editor",
condition=Q(name="Alice"),
),).values(
"name",
"alice_editors__pk",
).order_by("name", "alice_editors__name")
.distinct()
)self.assertSequenceEqual(
qs,[{"name": self.author1.name, "alice_editors__pk": self.editor_a.pk},
{"name": self.author2.name, "alice_editors__pk": None},
],)def test_nested_m2m_filtered(self):
qs = (Book.objects.annotate(
favorite_book=FilteredRelation(
"author__favorite_books",
condition=Q(author__favorite_books__title__icontains="book by"),
),).values(
"title",
"favorite_book__pk",
).order_by("title", "favorite_book__title")
)self.assertSequenceEqual(
qs,[{"title": self.book1.title, "favorite_book__pk": self.book2.pk},
{"title": self.book1.title, "favorite_book__pk": self.book3.pk},
{"title": self.book4.title, "favorite_book__pk": self.book2.pk},
{"title": self.book4.title, "favorite_book__pk": self.book3.pk},
{"title": self.book2.title, "favorite_book__pk": None},
{"title": self.book3.title, "favorite_book__pk": None},
],)def test_nested_chained_relations(self):
qs = (Author.objects.annotate(
my_books=FilteredRelation(
"book",
condition=Q(book__title__icontains="book by"),
),preferred_by_authors=FilteredRelation(
"my_books__preferred_by_authors",
condition=Q(my_books__preferred_by_authors__name="Alice"),
),).annotate(
author=F("name"),
book_title=F("my_books__title"),
preferred_by_author_pk=F("preferred_by_authors"),
).order_by("author", "book_title", "preferred_by_author_pk")
)self.assertQuerysetEqual(
qs,[("Alice", "The book by Alice", None),
("Jane", "The book by Jane A", self.author1.pk),
("Jane", "The book by Jane B", self.author1.pk),
],lambda x: (x.author, x.book_title, x.preferred_by_author_pk),
)def test_deep_nested_foreign_key(self):
qs = (Book.objects.annotate(
author_favorite_book_editor=FilteredRelation(
"author__favorite_books__editor",
condition=Q(author__favorite_books__title__icontains="Jane A"),
),).filter(
author_favorite_book_editor__isnull=False,
).select_related(
"author_favorite_book_editor",
).order_by("pk", "author_favorite_book_editor__pk")
)with self.assertNumQueries(1):
self.assertQuerysetEqual(
qs,[(self.book1, self.editor_b),
(self.book4, self.editor_b),
],lambda x: (x, x.author_favorite_book_editor),
)def test_relation_name_lookup(self):
msg = ("FilteredRelation's relation_name cannot contain lookups (got "
"'book__title__icontains')."
)with self.assertRaisesMessage(ValueError, msg):
Author.objects.annotate(
book_title=FilteredRelation(
"book__title__icontains",
condition=Q(book__title="Poem by Alice"),
),)def test_condition_outside_relation_name(self):
msg = ("FilteredRelation's condition doesn't support relations outside "
"the 'book__editor' (got 'book__author__name__icontains')."
)with self.assertRaisesMessage(ValueError, msg):
Author.objects.annotate(
book_editor=FilteredRelation(
"book__editor",
condition=Q(book__author__name__icontains="book"),
),)def test_condition_deeper_relation_name(self):
msg = ("FilteredRelation's condition doesn't support nested relations "
"deeper than the relation_name (got "
"'book__editor__name__icontains' for 'book')."
)with self.assertRaisesMessage(ValueError, msg):
Author.objects.annotate(
book_editor=FilteredRelation(
"book",
condition=Q(book__editor__name__icontains="b"),
),)def test_with_empty_relation_name_error(self):
with self.assertRaisesMessage(ValueError, "relation_name cannot be empty."):
FilteredRelation("", condition=Q(blank=""))
def test_with_condition_as_expression_error(self):
msg = "condition argument must be a Q() instance."
expression = Case(
When(book__title__iexact="poem by alice", then=True),
default=False,
)with self.assertRaisesMessage(ValueError, msg):
FilteredRelation("book", condition=expression)
def test_with_prefetch_related(self):
msg = "prefetch_related() is not supported with FilteredRelation."
qs = Author.objects.annotate(
book_title_contains_b=FilteredRelation(
"book", condition=Q(book__title__icontains="b")
),).filter(
book_title_contains_b__isnull=False,
)with self.assertRaisesMessage(ValueError, msg):
qs.prefetch_related("book_title_contains_b")
with self.assertRaisesMessage(ValueError, msg):
qs.prefetch_related("book_title_contains_b__editor")
def test_with_generic_foreign_key(self):
self.assertSequenceEqual(
Book.objects.annotate(
generic_authored_book=FilteredRelation(
"generic_author", condition=Q(generic_author__isnull=False)
),).filter(generic_authored_book__isnull=False),
[self.book1],
)def test_eq(self):
self.assertEqual(
FilteredRelation("book", condition=Q(book__title="b")), mock.ANY
)class FilteredRelationAggregationTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.author1 = Author.objects.create(name="Alice")
cls.editor_a = Editor.objects.create(name="a")
cls.book1 = Book.objects.create(
title="Poem by Alice",
editor=cls.editor_a,
author=cls.author1,
)cls.borrower1 = Borrower.objects.create(name="Jenny")
cls.borrower2 = Borrower.objects.create(name="Kevin")
# borrower 1 reserves, rents, and returns book1.
Reservation.objects.create(
borrower=cls.borrower1,
book=cls.book1,
state=Reservation.STOPPED,
)RentalSession.objects.create(
borrower=cls.borrower1,
book=cls.book1,
state=RentalSession.STOPPED,
)# borrower2 reserves, rents, and returns book1.
Reservation.objects.create(
borrower=cls.borrower2,
book=cls.book1,
state=Reservation.STOPPED,
)RentalSession.objects.create(
borrower=cls.borrower2,
book=cls.book1,
state=RentalSession.STOPPED,
)def test_aggregate(self):
"""
filtered_relation() not only improves performance but also createscorrect results when aggregating with multiple LEFT JOINs.Books can be reserved then rented by a borrower. Each reservation andrental session are recorded with Reservation and RentalSession models.Every time a reservation or a rental session is over, their state ischanged to 'stopped'.Goal: Count number of books that are either currently reserved orrented by borrower1 or available."""qs = (Book.objects.annotate(
is_reserved_or_rented_by=Case(
When(
reservation__state=Reservation.NEW,
then=F("reservation__borrower__pk"),
),When(
rental_session__state=RentalSession.NEW,
then=F("rental_session__borrower__pk"),
),default=None,
)).filter(
Q(is_reserved_or_rented_by=self.borrower1.pk) | Q(state=Book.AVAILABLE)
).distinct()
)self.assertEqual(qs.count(), 1)
# If count is equal to 1, the same aggregation should return in the
# same result but it returns 4.
self.assertSequenceEqual(
qs.annotate(total=Count("pk")).values("total"), [{"total": 4}]
)# With FilteredRelation, the result is as expected (1).
qs = (Book.objects.annotate(
active_reservations=FilteredRelation(
"reservation",
condition=Q(
reservation__state=Reservation.NEW,
reservation__borrower=self.borrower1,
),),).annotate(
active_rental_sessions=FilteredRelation(
"rental_session",
condition=Q(
rental_session__state=RentalSession.NEW,
rental_session__borrower=self.borrower1,
),),).filter(
(Q(active_reservations__isnull=False)
| Q(active_rental_sessions__isnull=False)
)| Q(state=Book.AVAILABLE)
).distinct()
)self.assertEqual(qs.count(), 1)
self.assertSequenceEqual(
qs.annotate(total=Count("pk")).values("total"), [{"total": 1}]
)class FilteredRelationAnalyticalAggregationTests(TestCase):
@classmethod
def setUpTestData(cls):
author = Author.objects.create(name="Author")
editor = Editor.objects.create(name="Editor")
cls.book1 = Book.objects.create(
title="Poem by Alice",
editor=editor,
author=author,
)cls.book2 = Book.objects.create(
title="The book by Jane A",
editor=editor,
author=author,
)cls.book3 = Book.objects.create(
title="The book by Jane B",
editor=editor,
author=author,
)cls.seller1 = Seller.objects.create(name="Seller 1")
cls.seller2 = Seller.objects.create(name="Seller 2")
cls.usd = Currency.objects.create(currency="USD")
cls.eur = Currency.objects.create(currency="EUR")
cls.sales_date1 = date(2020, 7, 6)
cls.sales_date2 = date(2020, 7, 7)
ExchangeRate.objects.bulk_create(
[ExchangeRate(
rate_date=cls.sales_date1,
from_currency=cls.usd,
to_currency=cls.eur,
rate=0.40,
),ExchangeRate(
rate_date=cls.sales_date1,
from_currency=cls.eur,
to_currency=cls.usd,
rate=1.60,
),ExchangeRate(
rate_date=cls.sales_date2,
from_currency=cls.usd,
to_currency=cls.eur,
rate=0.50,
),ExchangeRate(
rate_date=cls.sales_date2,
from_currency=cls.eur,
to_currency=cls.usd,
rate=1.50,
),ExchangeRate(
rate_date=cls.sales_date2,
from_currency=cls.usd,
to_currency=cls.usd,
rate=1.00,
),])BookDailySales.objects.bulk_create(
[BookDailySales(
book=cls.book1,
sale_date=cls.sales_date1,
currency=cls.usd,
sales=100.00,
seller=cls.seller1,
),BookDailySales(
book=cls.book2,
sale_date=cls.sales_date1,
currency=cls.eur,
sales=200.00,
seller=cls.seller1,
),BookDailySales(
book=cls.book1,
sale_date=cls.sales_date2,
currency=cls.usd,
sales=50.00,
seller=cls.seller2,
),BookDailySales(
book=cls.book2,
sale_date=cls.sales_date2,
currency=cls.eur,
sales=100.00,
seller=cls.seller2,
),])def test_aggregate(self):
tests = [Q(daily_sales__sale_date__gte=self.sales_date2),
~Q(daily_sales__seller=self.seller1),
]for condition in tests:
with self.subTest(condition=condition):
qs = (Book.objects.annotate(
recent_sales=FilteredRelation(
"daily_sales", condition=condition
),recent_sales_rates=FilteredRelation(
"recent_sales__currency__rates_from",
condition=Q(
recent_sales__currency__rates_from__rate_date=F(
"recent_sales__sale_date"
),recent_sales__currency__rates_from__to_currency=(
self.usd
),),),).annotate(
sales_sum=Sum(
F("recent_sales__sales") * F("recent_sales_rates__rate"),
output_field=DecimalField(),
),).values("title", "sales_sum")
.order_by(
F("sales_sum").desc(nulls_last=True),
))self.assertSequenceEqual(
qs,[{"title": self.book2.title, "sales_sum": Decimal(150.00)},
{"title": self.book1.title, "sales_sum": Decimal(50.00)},
{"title": self.book3.title, "sales_sum": None},
],)