1. import datetime
    
  2. from decimal import Decimal
    
  3. 
    
  4. from django.db.models import (
    
  5.     Avg,
    
  6.     Case,
    
  7.     Count,
    
  8.     Exists,
    
  9.     F,
    
  10.     Max,
    
  11.     OuterRef,
    
  12.     Q,
    
  13.     StdDev,
    
  14.     Subquery,
    
  15.     Sum,
    
  16.     Variance,
    
  17.     When,
    
  18. )
    
  19. from django.test import TestCase
    
  20. from django.test.utils import Approximate
    
  21. 
    
  22. from .models import Author, Book, Publisher
    
  23. 
    
  24. 
    
  25. class FilteredAggregateTests(TestCase):
    
  26.     @classmethod
    
  27.     def setUpTestData(cls):
    
  28.         cls.a1 = Author.objects.create(name="test", age=40)
    
  29.         cls.a2 = Author.objects.create(name="test2", age=60)
    
  30.         cls.a3 = Author.objects.create(name="test3", age=100)
    
  31.         cls.p1 = Publisher.objects.create(
    
  32.             name="Apress", num_awards=3, duration=datetime.timedelta(days=1)
    
  33.         )
    
  34.         cls.b1 = Book.objects.create(
    
  35.             isbn="159059725",
    
  36.             name="The Definitive Guide to Django: Web Development Done Right",
    
  37.             pages=447,
    
  38.             rating=4.5,
    
  39.             price=Decimal("30.00"),
    
  40.             contact=cls.a1,
    
  41.             publisher=cls.p1,
    
  42.             pubdate=datetime.date(2007, 12, 6),
    
  43.         )
    
  44.         cls.b2 = Book.objects.create(
    
  45.             isbn="067232959",
    
  46.             name="Sams Teach Yourself Django in 24 Hours",
    
  47.             pages=528,
    
  48.             rating=3.0,
    
  49.             price=Decimal("23.09"),
    
  50.             contact=cls.a2,
    
  51.             publisher=cls.p1,
    
  52.             pubdate=datetime.date(2008, 3, 3),
    
  53.         )
    
  54.         cls.b3 = Book.objects.create(
    
  55.             isbn="159059996",
    
  56.             name="Practical Django Projects",
    
  57.             pages=600,
    
  58.             rating=4.5,
    
  59.             price=Decimal("29.69"),
    
  60.             contact=cls.a3,
    
  61.             publisher=cls.p1,
    
  62.             pubdate=datetime.date(2008, 6, 23),
    
  63.         )
    
  64.         cls.a1.friends.add(cls.a2)
    
  65.         cls.a1.friends.add(cls.a3)
    
  66.         cls.b1.authors.add(cls.a1)
    
  67.         cls.b1.authors.add(cls.a3)
    
  68.         cls.b2.authors.add(cls.a2)
    
  69.         cls.b3.authors.add(cls.a3)
    
  70. 
    
  71.     def test_filtered_aggregates(self):
    
  72.         agg = Sum("age", filter=Q(name__startswith="test"))
    
  73.         self.assertEqual(Author.objects.aggregate(age=agg)["age"], 200)
    
  74. 
    
  75.     def test_filtered_numerical_aggregates(self):
    
  76.         for aggregate, expected_result in (
    
  77.             (Avg, Approximate(66.7, 1)),
    
  78.             (StdDev, Approximate(24.9, 1)),
    
  79.             (Variance, Approximate(622.2, 1)),
    
  80.         ):
    
  81.             with self.subTest(aggregate=aggregate.__name__):
    
  82.                 agg = aggregate("age", filter=Q(name__startswith="test"))
    
  83.                 self.assertEqual(
    
  84.                     Author.objects.aggregate(age=agg)["age"], expected_result
    
  85.                 )
    
  86. 
    
  87.     def test_double_filtered_aggregates(self):
    
  88.         agg = Sum("age", filter=Q(Q(name="test2") & ~Q(name="test")))
    
  89.         self.assertEqual(Author.objects.aggregate(age=agg)["age"], 60)
    
  90. 
    
  91.     def test_excluded_aggregates(self):
    
  92.         agg = Sum("age", filter=~Q(name="test2"))
    
  93.         self.assertEqual(Author.objects.aggregate(age=agg)["age"], 140)
    
  94. 
    
  95.     def test_related_aggregates_m2m(self):
    
  96.         agg = Sum("friends__age", filter=~Q(friends__name="test"))
    
  97.         self.assertEqual(
    
  98.             Author.objects.filter(name="test").aggregate(age=agg)["age"], 160
    
  99.         )
    
  100. 
    
  101.     def test_related_aggregates_m2m_and_fk(self):
    
  102.         q = Q(friends__book__publisher__name="Apress") & ~Q(friends__name="test3")
    
  103.         agg = Sum("friends__book__pages", filter=q)
    
  104.         self.assertEqual(
    
  105.             Author.objects.filter(name="test").aggregate(pages=agg)["pages"], 528
    
  106.         )
    
  107. 
    
  108.     def test_plain_annotate(self):
    
  109.         agg = Sum("book__pages", filter=Q(book__rating__gt=3))
    
  110.         qs = Author.objects.annotate(pages=agg).order_by("pk")
    
  111.         self.assertSequenceEqual([a.pages for a in qs], [447, None, 1047])
    
  112. 
    
  113.     def test_filtered_aggregate_on_annotate(self):
    
  114.         pages_annotate = Sum("book__pages", filter=Q(book__rating__gt=3))
    
  115.         age_agg = Sum("age", filter=Q(total_pages__gte=400))
    
  116.         aggregated = Author.objects.annotate(total_pages=pages_annotate).aggregate(
    
  117.             summed_age=age_agg
    
  118.         )
    
  119.         self.assertEqual(aggregated, {"summed_age": 140})
    
  120. 
    
  121.     def test_case_aggregate(self):
    
  122.         agg = Sum(
    
  123.             Case(When(friends__age=40, then=F("friends__age"))),
    
  124.             filter=Q(friends__name__startswith="test"),
    
  125.         )
    
  126.         self.assertEqual(Author.objects.aggregate(age=agg)["age"], 80)
    
  127. 
    
  128.     def test_sum_star_exception(self):
    
  129.         msg = "Star cannot be used with filter. Please specify a field."
    
  130.         with self.assertRaisesMessage(ValueError, msg):
    
  131.             Count("*", filter=Q(age=40))
    
  132. 
    
  133.     def test_filtered_reused_subquery(self):
    
  134.         qs = Author.objects.annotate(
    
  135.             older_friends_count=Count("friends", filter=Q(friends__age__gt=F("age"))),
    
  136.         ).filter(
    
  137.             older_friends_count__gte=2,
    
  138.         )
    
  139.         self.assertEqual(qs.get(pk__in=qs.values("pk")), self.a1)
    
  140. 
    
  141.     def test_filtered_aggregate_ref_annotation(self):
    
  142.         aggs = Author.objects.annotate(
    
  143.             double_age=F("age") * 2,
    
  144.         ).aggregate(
    
  145.             cnt=Count("pk", filter=Q(double_age__gt=100)),
    
  146.         )
    
  147.         self.assertEqual(aggs["cnt"], 2)
    
  148. 
    
  149.     def test_filtered_aggregate_ref_subquery_annotation(self):
    
  150.         aggs = Author.objects.annotate(
    
  151.             earliest_book_year=Subquery(
    
  152.                 Book.objects.filter(
    
  153.                     contact__pk=OuterRef("pk"),
    
  154.                 )
    
  155.                 .order_by("pubdate")
    
  156.                 .values("pubdate__year")[:1]
    
  157.             ),
    
  158.         ).aggregate(
    
  159.             cnt=Count("pk", filter=Q(earliest_book_year=2008)),
    
  160.         )
    
  161.         self.assertEqual(aggs["cnt"], 2)
    
  162. 
    
  163.     def test_filtered_aggregate_ref_multiple_subquery_annotation(self):
    
  164.         aggregate = (
    
  165.             Book.objects.values("publisher")
    
  166.             .annotate(
    
  167.                 has_authors=Exists(
    
  168.                     Book.authors.through.objects.filter(book=OuterRef("pk")),
    
  169.                 ),
    
  170.                 authors_have_other_books=Exists(
    
  171.                     Book.objects.filter(
    
  172.                         authors__in=Author.objects.filter(
    
  173.                             book_contact_set=OuterRef(OuterRef("pk")),
    
  174.                         )
    
  175.                     ).exclude(pk=OuterRef("pk")),
    
  176.                 ),
    
  177.             )
    
  178.             .aggregate(
    
  179.                 max_rating=Max(
    
  180.                     "rating",
    
  181.                     filter=Q(has_authors=True, authors_have_other_books=False),
    
  182.                 )
    
  183.             )
    
  184.         )
    
  185.         self.assertEqual(aggregate, {"max_rating": 4.5})
    
  186. 
    
  187.     def test_filtered_aggregate_on_exists(self):
    
  188.         aggregate = Book.objects.values("publisher").aggregate(
    
  189.             max_rating=Max(
    
  190.                 "rating",
    
  191.                 filter=Exists(
    
  192.                     Book.authors.through.objects.filter(book=OuterRef("pk")),
    
  193.                 ),
    
  194.             ),
    
  195.         )
    
  196.         self.assertEqual(aggregate, {"max_rating": 4.5})