1. from datetime import datetime
    
  2. 
    
  3. from django.core.exceptions import FieldError
    
  4. from django.db import DEFAULT_DB_ALIAS, connection
    
  5. from django.db.models import BooleanField, CharField, F, Q
    
  6. from django.db.models.expressions import (
    
  7.     Col,
    
  8.     Exists,
    
  9.     ExpressionWrapper,
    
  10.     Func,
    
  11.     RawSQL,
    
  12.     Value,
    
  13. )
    
  14. from django.db.models.fields.related_lookups import RelatedIsNull
    
  15. from django.db.models.functions import Lower
    
  16. from django.db.models.lookups import Exact, GreaterThan, IsNull, LessThan
    
  17. from django.db.models.sql.constants import SINGLE
    
  18. from django.db.models.sql.query import JoinPromoter, Query, get_field_names_from_opts
    
  19. from django.db.models.sql.where import OR
    
  20. from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
    
  21. from django.test.utils import register_lookup
    
  22. 
    
  23. from .models import Author, Item, ObjectC, Ranking
    
  24. 
    
  25. 
    
  26. class TestQuery(SimpleTestCase):
    
  27.     def test_simple_query(self):
    
  28.         query = Query(Author)
    
  29.         where = query.build_where(Q(num__gt=2))
    
  30.         lookup = where.children[0]
    
  31.         self.assertIsInstance(lookup, GreaterThan)
    
  32.         self.assertEqual(lookup.rhs, 2)
    
  33.         self.assertEqual(lookup.lhs.target, Author._meta.get_field("num"))
    
  34. 
    
  35.     def test_non_alias_cols_query(self):
    
  36.         query = Query(Author, alias_cols=False)
    
  37.         where = query.build_where(Q(num__gt=2, name__isnull=False) | Q(num__lt=F("id")))
    
  38. 
    
  39.         name_isnull_lookup, num_gt_lookup = where.children[0].children
    
  40.         self.assertIsInstance(num_gt_lookup, GreaterThan)
    
  41.         self.assertIsInstance(num_gt_lookup.lhs, Col)
    
  42.         self.assertIsNone(num_gt_lookup.lhs.alias)
    
  43.         self.assertIsInstance(name_isnull_lookup, IsNull)
    
  44.         self.assertIsInstance(name_isnull_lookup.lhs, Col)
    
  45.         self.assertIsNone(name_isnull_lookup.lhs.alias)
    
  46. 
    
  47.         num_lt_lookup = where.children[1]
    
  48.         self.assertIsInstance(num_lt_lookup, LessThan)
    
  49.         self.assertIsInstance(num_lt_lookup.rhs, Col)
    
  50.         self.assertIsNone(num_lt_lookup.rhs.alias)
    
  51.         self.assertIsInstance(num_lt_lookup.lhs, Col)
    
  52.         self.assertIsNone(num_lt_lookup.lhs.alias)
    
  53. 
    
  54.     def test_complex_query(self):
    
  55.         query = Query(Author)
    
  56.         where = query.build_where(Q(num__gt=2) | Q(num__lt=0))
    
  57.         self.assertEqual(where.connector, OR)
    
  58. 
    
  59.         lookup = where.children[0]
    
  60.         self.assertIsInstance(lookup, GreaterThan)
    
  61.         self.assertEqual(lookup.rhs, 2)
    
  62.         self.assertEqual(lookup.lhs.target, Author._meta.get_field("num"))
    
  63. 
    
  64.         lookup = where.children[1]
    
  65.         self.assertIsInstance(lookup, LessThan)
    
  66.         self.assertEqual(lookup.rhs, 0)
    
  67.         self.assertEqual(lookup.lhs.target, Author._meta.get_field("num"))
    
  68. 
    
  69.     def test_multiple_fields(self):
    
  70.         query = Query(Item, alias_cols=False)
    
  71.         where = query.build_where(Q(modified__gt=F("created")))
    
  72.         lookup = where.children[0]
    
  73.         self.assertIsInstance(lookup, GreaterThan)
    
  74.         self.assertIsInstance(lookup.rhs, Col)
    
  75.         self.assertIsNone(lookup.rhs.alias)
    
  76.         self.assertIsInstance(lookup.lhs, Col)
    
  77.         self.assertIsNone(lookup.lhs.alias)
    
  78.         self.assertEqual(lookup.rhs.target, Item._meta.get_field("created"))
    
  79.         self.assertEqual(lookup.lhs.target, Item._meta.get_field("modified"))
    
  80. 
    
  81.     def test_transform(self):
    
  82.         query = Query(Author, alias_cols=False)
    
  83.         with register_lookup(CharField, Lower):
    
  84.             where = query.build_where(~Q(name__lower="foo"))
    
  85.         lookup = where.children[0]
    
  86.         self.assertIsInstance(lookup, Exact)
    
  87.         self.assertIsInstance(lookup.lhs, Lower)
    
  88.         self.assertIsInstance(lookup.lhs.lhs, Col)
    
  89.         self.assertIsNone(lookup.lhs.lhs.alias)
    
  90.         self.assertEqual(lookup.lhs.lhs.target, Author._meta.get_field("name"))
    
  91. 
    
  92.     def test_negated_nullable(self):
    
  93.         query = Query(Item)
    
  94.         where = query.build_where(~Q(modified__lt=datetime(2017, 1, 1)))
    
  95.         self.assertTrue(where.negated)
    
  96.         lookup = where.children[0]
    
  97.         self.assertIsInstance(lookup, LessThan)
    
  98.         self.assertEqual(lookup.lhs.target, Item._meta.get_field("modified"))
    
  99.         lookup = where.children[1]
    
  100.         self.assertIsInstance(lookup, IsNull)
    
  101.         self.assertEqual(lookup.lhs.target, Item._meta.get_field("modified"))
    
  102. 
    
  103.     def test_foreign_key(self):
    
  104.         query = Query(Item)
    
  105.         msg = "Joined field references are not permitted in this query"
    
  106.         with self.assertRaisesMessage(FieldError, msg):
    
  107.             query.build_where(Q(creator__num__gt=2))
    
  108. 
    
  109.     def test_foreign_key_f(self):
    
  110.         query = Query(Ranking)
    
  111.         with self.assertRaises(FieldError):
    
  112.             query.build_where(Q(rank__gt=F("author__num")))
    
  113. 
    
  114.     def test_foreign_key_exclusive(self):
    
  115.         query = Query(ObjectC, alias_cols=False)
    
  116.         where = query.build_where(Q(objecta=None) | Q(objectb=None))
    
  117.         a_isnull = where.children[0]
    
  118.         self.assertIsInstance(a_isnull, RelatedIsNull)
    
  119.         self.assertIsInstance(a_isnull.lhs, Col)
    
  120.         self.assertIsNone(a_isnull.lhs.alias)
    
  121.         self.assertEqual(a_isnull.lhs.target, ObjectC._meta.get_field("objecta"))
    
  122.         b_isnull = where.children[1]
    
  123.         self.assertIsInstance(b_isnull, RelatedIsNull)
    
  124.         self.assertIsInstance(b_isnull.lhs, Col)
    
  125.         self.assertIsNone(b_isnull.lhs.alias)
    
  126.         self.assertEqual(b_isnull.lhs.target, ObjectC._meta.get_field("objectb"))
    
  127. 
    
  128.     def test_clone_select_related(self):
    
  129.         query = Query(Item)
    
  130.         query.add_select_related(["creator"])
    
  131.         clone = query.clone()
    
  132.         clone.add_select_related(["note", "creator__extra"])
    
  133.         self.assertEqual(query.select_related, {"creator": {}})
    
  134. 
    
  135.     def test_iterable_lookup_value(self):
    
  136.         query = Query(Item)
    
  137.         where = query.build_where(Q(name=["a", "b"]))
    
  138.         name_exact = where.children[0]
    
  139.         self.assertIsInstance(name_exact, Exact)
    
  140.         self.assertEqual(name_exact.rhs, "['a', 'b']")
    
  141. 
    
  142.     def test_filter_conditional(self):
    
  143.         query = Query(Item)
    
  144.         where = query.build_where(Func(output_field=BooleanField()))
    
  145.         exact = where.children[0]
    
  146.         self.assertIsInstance(exact, Exact)
    
  147.         self.assertIsInstance(exact.lhs, Func)
    
  148.         self.assertIs(exact.rhs, True)
    
  149. 
    
  150.     def test_filter_conditional_join(self):
    
  151.         query = Query(Item)
    
  152.         filter_expr = Func("note__note", output_field=BooleanField())
    
  153.         msg = "Joined field references are not permitted in this query"
    
  154.         with self.assertRaisesMessage(FieldError, msg):
    
  155.             query.build_where(filter_expr)
    
  156. 
    
  157.     def test_filter_non_conditional(self):
    
  158.         query = Query(Item)
    
  159.         msg = "Cannot filter against a non-conditional expression."
    
  160.         with self.assertRaisesMessage(TypeError, msg):
    
  161.             query.build_where(Func(output_field=CharField()))
    
  162. 
    
  163. 
    
  164. class TestQueryNoModel(TestCase):
    
  165.     def test_rawsql_annotation(self):
    
  166.         query = Query(None)
    
  167.         sql = "%s IS NULL"
    
  168.         # Wrap with a CASE WHEN expression if a database backend (e.g. Oracle)
    
  169.         # doesn't support boolean expression in SELECT list.
    
  170.         if not connection.features.supports_boolean_expr_in_select_clause:
    
  171.             sql = f"CASE WHEN {sql} THEN 1 ELSE 0 END"
    
  172.         query.add_annotation(RawSQL(sql, (None,), BooleanField()), "_check")
    
  173.         result = query.get_compiler(using=DEFAULT_DB_ALIAS).execute_sql(SINGLE)
    
  174.         self.assertEqual(result[0], 1)
    
  175. 
    
  176.     def test_subquery_annotation(self):
    
  177.         query = Query(None)
    
  178.         query.add_annotation(Exists(Item.objects.all()), "_check")
    
  179.         result = query.get_compiler(using=DEFAULT_DB_ALIAS).execute_sql(SINGLE)
    
  180.         self.assertEqual(result[0], 0)
    
  181. 
    
  182.     @skipUnlessDBFeature("supports_boolean_expr_in_select_clause")
    
  183.     def test_q_annotation(self):
    
  184.         query = Query(None)
    
  185.         check = ExpressionWrapper(
    
  186.             Q(RawSQL("%s IS NULL", (None,), BooleanField()))
    
  187.             | Q(Exists(Item.objects.all())),
    
  188.             BooleanField(),
    
  189.         )
    
  190.         query.add_annotation(check, "_check")
    
  191.         result = query.get_compiler(using=DEFAULT_DB_ALIAS).execute_sql(SINGLE)
    
  192.         self.assertEqual(result[0], 1)
    
  193. 
    
  194.     def test_names_to_path_field(self):
    
  195.         query = Query(None)
    
  196.         query.add_annotation(Value(True), "value")
    
  197.         path, final_field, targets, names = query.names_to_path(["value"], opts=None)
    
  198.         self.assertEqual(path, [])
    
  199.         self.assertIsInstance(final_field, BooleanField)
    
  200.         self.assertEqual(len(targets), 1)
    
  201.         self.assertIsInstance(targets[0], BooleanField)
    
  202.         self.assertEqual(names, [])
    
  203. 
    
  204.     def test_names_to_path_field_error(self):
    
  205.         query = Query(None)
    
  206.         msg = "Cannot resolve keyword 'nonexistent' into field."
    
  207.         with self.assertRaisesMessage(FieldError, msg):
    
  208.             query.names_to_path(["nonexistent"], opts=None)
    
  209. 
    
  210.     def test_get_field_names_from_opts(self):
    
  211.         self.assertEqual(get_field_names_from_opts(None), set())
    
  212. 
    
  213. 
    
  214. class JoinPromoterTest(SimpleTestCase):
    
  215.     def test_repr(self):
    
  216.         self.assertEqual(
    
  217.             repr(JoinPromoter("AND", 3, True)),
    
  218.             "JoinPromoter(connector='AND', num_children=3, negated=True)",
    
  219.         )