1. from django.db import connection
    
  2. from django.db.models import CharField, F, Max
    
  3. from django.db.models.functions import Lower
    
  4. from django.test import TestCase, skipUnlessDBFeature
    
  5. from django.test.utils import register_lookup
    
  6. 
    
  7. from .models import Celebrity, Fan, Staff, StaffTag, Tag
    
  8. 
    
  9. 
    
  10. @skipUnlessDBFeature("can_distinct_on_fields")
    
  11. @skipUnlessDBFeature("supports_nullable_unique_constraints")
    
  12. class DistinctOnTests(TestCase):
    
  13.     @classmethod
    
  14.     def setUpTestData(cls):
    
  15.         cls.t1 = Tag.objects.create(name="t1")
    
  16.         cls.t2 = Tag.objects.create(name="t2", parent=cls.t1)
    
  17.         cls.t3 = Tag.objects.create(name="t3", parent=cls.t1)
    
  18.         cls.t4 = Tag.objects.create(name="t4", parent=cls.t3)
    
  19.         cls.t5 = Tag.objects.create(name="t5", parent=cls.t3)
    
  20. 
    
  21.         cls.p1_o1 = Staff.objects.create(id=1, name="p1", organisation="o1")
    
  22.         cls.p2_o1 = Staff.objects.create(id=2, name="p2", organisation="o1")
    
  23.         cls.p3_o1 = Staff.objects.create(id=3, name="p3", organisation="o1")
    
  24.         cls.p1_o2 = Staff.objects.create(id=4, name="p1", organisation="o2")
    
  25.         cls.p1_o1.coworkers.add(cls.p2_o1, cls.p3_o1)
    
  26.         cls.st1 = StaffTag.objects.create(staff=cls.p1_o1, tag=cls.t1)
    
  27.         StaffTag.objects.create(staff=cls.p1_o1, tag=cls.t1)
    
  28. 
    
  29.         cls.celeb1 = Celebrity.objects.create(name="c1")
    
  30.         cls.celeb2 = Celebrity.objects.create(name="c2")
    
  31. 
    
  32.         cls.fan1 = Fan.objects.create(fan_of=cls.celeb1)
    
  33.         cls.fan2 = Fan.objects.create(fan_of=cls.celeb1)
    
  34.         cls.fan3 = Fan.objects.create(fan_of=cls.celeb2)
    
  35. 
    
  36.     def test_basic_distinct_on(self):
    
  37.         """QuerySet.distinct('field', ...) works"""
    
  38.         # (qset, expected) tuples
    
  39.         qsets = (
    
  40.             (
    
  41.                 Staff.objects.distinct().order_by("name"),
    
  42.                 [self.p1_o1, self.p1_o2, self.p2_o1, self.p3_o1],
    
  43.             ),
    
  44.             (
    
  45.                 Staff.objects.distinct("name").order_by("name"),
    
  46.                 [self.p1_o1, self.p2_o1, self.p3_o1],
    
  47.             ),
    
  48.             (
    
  49.                 Staff.objects.distinct("organisation").order_by("organisation", "name"),
    
  50.                 [self.p1_o1, self.p1_o2],
    
  51.             ),
    
  52.             (
    
  53.                 Staff.objects.distinct("name", "organisation").order_by(
    
  54.                     "name", "organisation"
    
  55.                 ),
    
  56.                 [self.p1_o1, self.p1_o2, self.p2_o1, self.p3_o1],
    
  57.             ),
    
  58.             (
    
  59.                 Celebrity.objects.filter(fan__in=[self.fan1, self.fan2, self.fan3])
    
  60.                 .distinct("name")
    
  61.                 .order_by("name"),
    
  62.                 [self.celeb1, self.celeb2],
    
  63.             ),
    
  64.             # Does combining querysets work?
    
  65.             (
    
  66.                 (
    
  67.                     Celebrity.objects.filter(fan__in=[self.fan1, self.fan2])
    
  68.                     .distinct("name")
    
  69.                     .order_by("name")
    
  70.                     | Celebrity.objects.filter(fan__in=[self.fan3])
    
  71.                     .distinct("name")
    
  72.                     .order_by("name")
    
  73.                 ),
    
  74.                 [self.celeb1, self.celeb2],
    
  75.             ),
    
  76.             (StaffTag.objects.distinct("staff", "tag"), [self.st1]),
    
  77.             (
    
  78.                 Tag.objects.order_by("parent__pk", "pk").distinct("parent"),
    
  79.                 [self.t2, self.t4, self.t1]
    
  80.                 if connection.features.nulls_order_largest
    
  81.                 else [self.t1, self.t2, self.t4],
    
  82.             ),
    
  83.             (
    
  84.                 StaffTag.objects.select_related("staff")
    
  85.                 .distinct("staff__name")
    
  86.                 .order_by("staff__name"),
    
  87.                 [self.st1],
    
  88.             ),
    
  89.             # Fetch the alphabetically first coworker for each worker
    
  90.             (
    
  91.                 (
    
  92.                     Staff.objects.distinct("id")
    
  93.                     .order_by("id", "coworkers__name")
    
  94.                     .values_list("id", "coworkers__name")
    
  95.                 ),
    
  96.                 [(1, "p2"), (2, "p1"), (3, "p1"), (4, None)],
    
  97.             ),
    
  98.         )
    
  99.         for qset, expected in qsets:
    
  100.             self.assertSequenceEqual(qset, expected)
    
  101.             self.assertEqual(qset.count(), len(expected))
    
  102. 
    
  103.         # Combining queries with non-unique query is not allowed.
    
  104.         base_qs = Celebrity.objects.all()
    
  105.         msg = "Cannot combine a unique query with a non-unique query."
    
  106.         with self.assertRaisesMessage(TypeError, msg):
    
  107.             base_qs.distinct("id") & base_qs
    
  108.         # Combining queries with different distinct_fields is not allowed.
    
  109.         msg = "Cannot combine queries with different distinct fields."
    
  110.         with self.assertRaisesMessage(TypeError, msg):
    
  111.             base_qs.distinct("id") & base_qs.distinct("name")
    
  112. 
    
  113.         # Test join unreffing
    
  114.         c1 = Celebrity.objects.distinct("greatest_fan__id", "greatest_fan__fan_of")
    
  115.         self.assertIn("OUTER JOIN", str(c1.query))
    
  116.         c2 = c1.distinct("pk")
    
  117.         self.assertNotIn("OUTER JOIN", str(c2.query))
    
  118. 
    
  119.     def test_sliced_queryset(self):
    
  120.         msg = "Cannot create distinct fields once a slice has been taken."
    
  121.         with self.assertRaisesMessage(TypeError, msg):
    
  122.             Staff.objects.all()[0:5].distinct("name")
    
  123. 
    
  124.     def test_transform(self):
    
  125.         new_name = self.t1.name.upper()
    
  126.         self.assertNotEqual(self.t1.name, new_name)
    
  127.         Tag.objects.create(name=new_name)
    
  128.         with register_lookup(CharField, Lower):
    
  129.             self.assertCountEqual(
    
  130.                 Tag.objects.order_by().distinct("name__lower"),
    
  131.                 [self.t1, self.t2, self.t3, self.t4, self.t5],
    
  132.             )
    
  133. 
    
  134.     def test_distinct_not_implemented_checks(self):
    
  135.         # distinct + annotate not allowed
    
  136.         msg = "annotate() + distinct(fields) is not implemented."
    
  137.         with self.assertRaisesMessage(NotImplementedError, msg):
    
  138.             Celebrity.objects.annotate(Max("id")).distinct("id")[0]
    
  139.         with self.assertRaisesMessage(NotImplementedError, msg):
    
  140.             Celebrity.objects.distinct("id").annotate(Max("id"))[0]
    
  141. 
    
  142.         # However this check is done only when the query executes, so you
    
  143.         # can use distinct() to remove the fields before execution.
    
  144.         Celebrity.objects.distinct("id").annotate(Max("id")).distinct()[0]
    
  145.         # distinct + aggregate not allowed
    
  146.         msg = "aggregate() + distinct(fields) not implemented."
    
  147.         with self.assertRaisesMessage(NotImplementedError, msg):
    
  148.             Celebrity.objects.distinct("id").aggregate(Max("id"))
    
  149. 
    
  150.     def test_distinct_on_in_ordered_subquery(self):
    
  151.         qs = Staff.objects.distinct("name").order_by("name", "id")
    
  152.         qs = Staff.objects.filter(pk__in=qs).order_by("name")
    
  153.         self.assertSequenceEqual(qs, [self.p1_o1, self.p2_o1, self.p3_o1])
    
  154.         qs = Staff.objects.distinct("name").order_by("name", "-id")
    
  155.         qs = Staff.objects.filter(pk__in=qs).order_by("name")
    
  156.         self.assertSequenceEqual(qs, [self.p1_o2, self.p2_o1, self.p3_o1])
    
  157. 
    
  158.     def test_distinct_on_get_ordering_preserved(self):
    
  159.         """
    
  160.         Ordering shouldn't be cleared when distinct on fields are specified.
    
  161.         refs #25081
    
  162.         """
    
  163.         staff = (
    
  164.             Staff.objects.distinct("name")
    
  165.             .order_by("name", "-organisation")
    
  166.             .get(name="p1")
    
  167.         )
    
  168.         self.assertEqual(staff.organisation, "o2")
    
  169. 
    
  170.     def test_distinct_on_mixed_case_annotation(self):
    
  171.         qs = (
    
  172.             Staff.objects.annotate(
    
  173.                 nAmEAlIaS=F("name"),
    
  174.             )
    
  175.             .distinct("nAmEAlIaS")
    
  176.             .order_by("nAmEAlIaS")
    
  177.         )
    
  178.         self.assertSequenceEqual(qs, [self.p1_o1, self.p2_o1, self.p3_o1])