import copy
import datetime
import pickle
from operator import attrgetter
from django.core.exceptions import FieldError
from django.db import models
from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
from django.test.utils import isolate_apps
from django.utils import translation
from .models import (
Article,
ArticleIdea,
ArticleTag,
ArticleTranslation,
Country,
Friendship,
Group,
Membership,
NewsArticle,
Person,
)
# Note that these tests are testing internal implementation details.
# ForeignObject is not part of public API.
class MultiColumnFKTests(TestCase):
@classmethod
def setUpTestData(cls):
# Creating countries
cls.usa = Country.objects.create(name="United States of America")
cls.soviet_union = Country.objects.create(name="Soviet Union")
# Creating People
cls.bob = Person.objects.create(name="Bob", person_country=cls.usa)
cls.jim = Person.objects.create(name="Jim", person_country=cls.usa)
cls.george = Person.objects.create(name="George", person_country=cls.usa)
cls.jane = Person.objects.create(name="Jane", person_country=cls.soviet_union)
cls.mark = Person.objects.create(name="Mark", person_country=cls.soviet_union)
cls.sam = Person.objects.create(name="Sam", person_country=cls.soviet_union)
# Creating Groups
cls.kgb = Group.objects.create(name="KGB", group_country=cls.soviet_union)
cls.cia = Group.objects.create(name="CIA", group_country=cls.usa)
cls.republican = Group.objects.create(name="Republican", group_country=cls.usa)
cls.democrat = Group.objects.create(name="Democrat", group_country=cls.usa)
def test_get_succeeds_on_multicolumn_match(self):
# Membership objects have access to their related Person if both
# country_ids match between them
membership = Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.bob.id,
group_id=self.cia.id,
)
person = membership.person
self.assertEqual((person.id, person.name), (self.bob.id, "Bob"))
def test_get_fails_on_multicolumn_mismatch(self):
# Membership objects returns DoesNotExist error when there is no
# Person with the same id and country_id
membership = Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.jane.id,
group_id=self.cia.id,
)
with self.assertRaises(Person.DoesNotExist):
getattr(membership, "person")
def test_reverse_query_returns_correct_result(self):
# Creating a valid membership because it has the same country has the person
Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.bob.id,
group_id=self.cia.id,
)
# Creating an invalid membership because it has a different country has
# the person.
Membership.objects.create(
membership_country_id=self.soviet_union.id,
person_id=self.bob.id,
group_id=self.republican.id,
)
with self.assertNumQueries(1):
membership = self.bob.membership_set.get()
self.assertEqual(membership.group_id, self.cia.id)
self.assertIs(membership.person, self.bob)
def test_query_filters_correctly(self):
# Creating a to valid memberships
Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.bob.id,
group_id=self.cia.id,
)
Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.jim.id,
group_id=self.cia.id,
)
# Creating an invalid membership
Membership.objects.create(
membership_country_id=self.soviet_union.id,
person_id=self.george.id,
group_id=self.cia.id,
)
self.assertQuerysetEqual(
Membership.objects.filter(person__name__contains="o"),
[self.bob.id],
attrgetter("person_id"),
)
def test_reverse_query_filters_correctly(self):
timemark = datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)
timedelta = datetime.timedelta(days=1)
# Creating a to valid memberships
Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.bob.id,
group_id=self.cia.id,
date_joined=timemark - timedelta,
)
Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.jim.id,
group_id=self.cia.id,
date_joined=timemark + timedelta,
)
# Creating an invalid membership
Membership.objects.create(
membership_country_id=self.soviet_union.id,
person_id=self.george.id,
group_id=self.cia.id,
date_joined=timemark + timedelta,
)
self.assertQuerysetEqual(
Person.objects.filter(membership__date_joined__gte=timemark),
["Jim"],
attrgetter("name"),
)
def test_forward_in_lookup_filters_correctly(self):
Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.bob.id,
group_id=self.cia.id,
)
Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.jim.id,
group_id=self.cia.id,
)
# Creating an invalid membership
Membership.objects.create(
membership_country_id=self.soviet_union.id,
person_id=self.george.id,
group_id=self.cia.id,
)
self.assertQuerysetEqual(
Membership.objects.filter(person__in=[self.george, self.jim]),
[
self.jim.id,
],
attrgetter("person_id"),
)
self.assertQuerysetEqual(
Membership.objects.filter(person__in=Person.objects.filter(name="Jim")),
[
self.jim.id,
],
attrgetter("person_id"),
)
def test_double_nested_query(self):
m1 = Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.bob.id,
group_id=self.cia.id,
)
m2 = Membership.objects.create(
membership_country_id=self.usa.id,
person_id=self.jim.id,
group_id=self.cia.id,
)
Friendship.objects.create(
from_friend_country_id=self.usa.id,
from_friend_id=self.bob.id,
to_friend_country_id=self.usa.id,
to_friend_id=self.jim.id,
)
self.assertSequenceEqual(
Membership.objects.filter(
person__in=Person.objects.filter(
from_friend__in=Friendship.objects.filter(
to_friend__in=Person.objects.all()
)
)
),
[m1],
)
self.assertSequenceEqual(
Membership.objects.exclude(
person__in=Person.objects.filter(
from_friend__in=Friendship.objects.filter(
to_friend__in=Person.objects.all()
)
)
),
[m2],
)
def test_select_related_foreignkey_forward_works(self):
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
Membership.objects.create(
membership_country=self.usa, person=self.jim, group=self.democrat
)
with self.assertNumQueries(1):
people = [
m.person
for m in Membership.objects.select_related("person").order_by("pk")
]
normal_people = [m.person for m in Membership.objects.order_by("pk")]
self.assertEqual(people, normal_people)
def test_prefetch_foreignkey_forward_works(self):
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
Membership.objects.create(
membership_country=self.usa, person=self.jim, group=self.democrat
)
with self.assertNumQueries(2):
people = [
m.person
for m in Membership.objects.prefetch_related("person").order_by("pk")
]
normal_people = [m.person for m in Membership.objects.order_by("pk")]
self.assertEqual(people, normal_people)
def test_prefetch_foreignkey_reverse_works(self):
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
Membership.objects.create(
membership_country=self.usa, person=self.jim, group=self.democrat
)
with self.assertNumQueries(2):
membership_sets = [
list(p.membership_set.all())
for p in Person.objects.prefetch_related("membership_set").order_by(
"pk"
)
]
with self.assertNumQueries(7):
normal_membership_sets = [
list(p.membership_set.all()) for p in Person.objects.order_by("pk")
]
self.assertEqual(membership_sets, normal_membership_sets)
def test_m2m_through_forward_returns_valid_members(self):
# We start out by making sure that the Group 'CIA' has no members.
self.assertQuerysetEqual(self.cia.members.all(), [])
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
Membership.objects.create(
membership_country=self.usa, person=self.jim, group=self.cia
)
# Bob and Jim should be members of the CIA.
self.assertQuerysetEqual(
self.cia.members.all(), ["Bob", "Jim"], attrgetter("name")
)
def test_m2m_through_reverse_returns_valid_members(self):
# We start out by making sure that Bob is in no groups.
self.assertQuerysetEqual(self.bob.groups.all(), [])
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.republican
)
# Bob should be in the CIA and a Republican
self.assertQuerysetEqual(
self.bob.groups.all(), ["CIA", "Republican"], attrgetter("name")
)
def test_m2m_through_forward_ignores_invalid_members(self):
# We start out by making sure that the Group 'CIA' has no members.
self.assertQuerysetEqual(self.cia.members.all(), [])
# Something adds jane to group CIA but Jane is in Soviet Union which
# isn't CIA's country.
Membership.objects.create(
membership_country=self.usa, person=self.jane, group=self.cia
)
# There should still be no members in CIA
self.assertQuerysetEqual(self.cia.members.all(), [])
def test_m2m_through_reverse_ignores_invalid_members(self):
# We start out by making sure that Jane has no groups.
self.assertQuerysetEqual(self.jane.groups.all(), [])
# Something adds jane to group CIA but Jane is in Soviet Union which
# isn't CIA's country.
Membership.objects.create(
membership_country=self.usa, person=self.jane, group=self.cia
)
# Jane should still not be in any groups
self.assertQuerysetEqual(self.jane.groups.all(), [])
def test_m2m_through_on_self_works(self):
self.assertQuerysetEqual(self.jane.friends.all(), [])
Friendship.objects.create(
from_friend_country=self.jane.person_country,
from_friend=self.jane,
to_friend_country=self.george.person_country,
to_friend=self.george,
)
self.assertQuerysetEqual(
self.jane.friends.all(), ["George"], attrgetter("name")
)
def test_m2m_through_on_self_ignores_mismatch_columns(self):
self.assertQuerysetEqual(self.jane.friends.all(), [])
# Note that we use ids instead of instances. This is because instances
# on ForeignObject properties will set all related field off of the
# given instance.
Friendship.objects.create(
from_friend_id=self.jane.id,
to_friend_id=self.george.id,
to_friend_country_id=self.jane.person_country_id,
from_friend_country_id=self.george.person_country_id,
)
self.assertQuerysetEqual(self.jane.friends.all(), [])
def test_prefetch_related_m2m_forward_works(self):
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
Membership.objects.create(
membership_country=self.usa, person=self.jim, group=self.democrat
)
with self.assertNumQueries(2):
members_lists = [
list(g.members.all()) for g in Group.objects.prefetch_related("members")
]
normal_members_lists = [list(g.members.all()) for g in Group.objects.all()]
self.assertEqual(members_lists, normal_members_lists)
def test_prefetch_related_m2m_reverse_works(self):
Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
Membership.objects.create(
membership_country=self.usa, person=self.jim, group=self.democrat
)
with self.assertNumQueries(2):
groups_lists = [
list(p.groups.all()) for p in Person.objects.prefetch_related("groups")
]
normal_groups_lists = [list(p.groups.all()) for p in Person.objects.all()]
self.assertEqual(groups_lists, normal_groups_lists)
@translation.override("fi")
def test_translations(self):
a1 = Article.objects.create(pub_date=datetime.date.today())
at1_fi = ArticleTranslation(
article=a1, lang="fi", title="Otsikko", body="Diipadaapa"
)
at1_fi.save()
at2_en = ArticleTranslation(
article=a1, lang="en", title="Title", body="Lalalalala"
)
at2_en.save()
self.assertEqual(Article.objects.get(pk=a1.pk).active_translation, at1_fi)
with self.assertNumQueries(1):
fetched = Article.objects.select_related("active_translation").get(
active_translation__title="Otsikko"
)
self.assertEqual(fetched.active_translation.title, "Otsikko")
a2 = Article.objects.create(pub_date=datetime.date.today())
at2_fi = ArticleTranslation(
article=a2, lang="fi", title="Atsikko", body="Diipadaapa", abstract="dipad"
)
at2_fi.save()
a3 = Article.objects.create(pub_date=datetime.date.today())
at3_en = ArticleTranslation(
article=a3, lang="en", title="A title", body="lalalalala", abstract="lala"
)
at3_en.save()
# Test model initialization with active_translation field.
a3 = Article(id=a3.id, pub_date=a3.pub_date, active_translation=at3_en)
a3.save()
self.assertEqual(
list(Article.objects.filter(active_translation__abstract=None)), [a1, a3]
)
self.assertEqual(
list(
Article.objects.filter(
active_translation__abstract=None,
active_translation__pk__isnull=False,
)
),
[a1],
)
with translation.override("en"):
self.assertEqual(
list(Article.objects.filter(active_translation__abstract=None)),
[a1, a2],
)
def test_foreign_key_raises_informative_does_not_exist(self):
referrer = ArticleTranslation()
with self.assertRaisesMessage(
Article.DoesNotExist, "ArticleTranslation has no article"
):
referrer.article
def test_foreign_key_related_query_name(self):
a1 = Article.objects.create(pub_date=datetime.date.today())
ArticleTag.objects.create(article=a1, name="foo")
self.assertEqual(Article.objects.filter(tag__name="foo").count(), 1)
self.assertEqual(Article.objects.filter(tag__name="bar").count(), 0)
msg = (
"Cannot resolve keyword 'tags' into field. Choices are: "
"active_translation, active_translation_q, articletranslation, "
"id, idea_things, newsarticle, pub_date, tag"
)
with self.assertRaisesMessage(FieldError, msg):
Article.objects.filter(tags__name="foo")
def test_many_to_many_related_query_name(self):
a1 = Article.objects.create(pub_date=datetime.date.today())
i1 = ArticleIdea.objects.create(name="idea1")
a1.ideas.add(i1)
self.assertEqual(Article.objects.filter(idea_things__name="idea1").count(), 1)
self.assertEqual(Article.objects.filter(idea_things__name="idea2").count(), 0)
msg = (
"Cannot resolve keyword 'ideas' into field. Choices are: "
"active_translation, active_translation_q, articletranslation, "
"id, idea_things, newsarticle, pub_date, tag"
)
with self.assertRaisesMessage(FieldError, msg):
Article.objects.filter(ideas__name="idea1")
@translation.override("fi")
def test_inheritance(self):
na = NewsArticle.objects.create(pub_date=datetime.date.today())
ArticleTranslation.objects.create(
article=na, lang="fi", title="foo", body="bar"
)
self.assertSequenceEqual(
NewsArticle.objects.select_related("active_translation"), [na]
)
with self.assertNumQueries(1):
self.assertEqual(
NewsArticle.objects.select_related("active_translation")[
0
].active_translation.title,
"foo",
)
@skipUnlessDBFeature("has_bulk_insert")
def test_batch_create_foreign_object(self):
objs = [
Person(name="abcd_%s" % i, person_country=self.usa) for i in range(0, 5)
]
Person.objects.bulk_create(objs, 10)
def test_isnull_lookup(self):
m1 = Membership.objects.create(
membership_country=self.usa, person=self.bob, group_id=None
)
m2 = Membership.objects.create(
membership_country=self.usa, person=self.bob, group=self.cia
)
self.assertSequenceEqual(
Membership.objects.filter(group__isnull=True),
[m1],
)
self.assertSequenceEqual(
Membership.objects.filter(group__isnull=False),
[m2],
)
class TestModelCheckTests(SimpleTestCase):
@isolate_apps("foreign_object")
def test_check_composite_foreign_object(self):
class Parent(models.Model):
a = models.PositiveIntegerField()
b = models.PositiveIntegerField()
class Meta:
unique_together = (("a", "b"),)
class Child(models.Model):
a = models.PositiveIntegerField()
b = models.PositiveIntegerField()
value = models.CharField(max_length=255)
parent = models.ForeignObject(
Parent,
on_delete=models.SET_NULL,
from_fields=("a", "b"),
to_fields=("a", "b"),
related_name="children",
)
self.assertEqual(Child._meta.get_field("parent").check(from_model=Child), [])
@isolate_apps("foreign_object")
def test_check_subset_composite_foreign_object(self):
class Parent(models.Model):
a = models.PositiveIntegerField()
b = models.PositiveIntegerField()
c = models.PositiveIntegerField()
class Meta:
unique_together = (("a", "b"),)
class Child(models.Model):
a = models.PositiveIntegerField()
b = models.PositiveIntegerField()
c = models.PositiveIntegerField()
d = models.CharField(max_length=255)
parent = models.ForeignObject(
Parent,
on_delete=models.SET_NULL,
from_fields=("a", "b", "c"),
to_fields=("a", "b", "c"),
related_name="children",
)
self.assertEqual(Child._meta.get_field("parent").check(from_model=Child), [])
class TestExtraJoinFilterQ(TestCase):
@translation.override("fi")
def test_extra_join_filter_q(self):
a = Article.objects.create(pub_date=datetime.datetime.today())
ArticleTranslation.objects.create(
article=a, lang="fi", title="title", body="body"
)
qs = Article.objects.all()
with self.assertNumQueries(2):
self.assertEqual(qs[0].active_translation_q.title, "title")
qs = qs.select_related("active_translation_q")
with self.assertNumQueries(1):
self.assertEqual(qs[0].active_translation_q.title, "title")
class TestCachedPathInfo(TestCase):
def test_equality(self):
"""
The path_infos and reverse_path_infos attributes are equivalent to
calling the get_<method>() with no arguments.
"""
foreign_object = Membership._meta.get_field("person")
self.assertEqual(
foreign_object.path_infos,
foreign_object.get_path_info(),
)
self.assertEqual(
foreign_object.reverse_path_infos,
foreign_object.get_reverse_path_info(),
)
def test_copy_removes_direct_cached_values(self):
"""
Shallow copying a ForeignObject (or a ForeignObjectRel) removes the
object's direct cached PathInfo values.
"""
foreign_object = Membership._meta.get_field("person")
# Trigger storage of cached_property into ForeignObject's __dict__.
foreign_object.path_infos
foreign_object.reverse_path_infos
# The ForeignObjectRel doesn't have reverse_path_infos.
foreign_object.remote_field.path_infos
self.assertIn("path_infos", foreign_object.__dict__)
self.assertIn("reverse_path_infos", foreign_object.__dict__)
self.assertIn("path_infos", foreign_object.remote_field.__dict__)
# Cached value is removed via __getstate__() on ForeignObjectRel
# because no __copy__() method exists, so __reduce_ex__() is used.
remote_field_copy = copy.copy(foreign_object.remote_field)
self.assertNotIn("path_infos", remote_field_copy.__dict__)
# Cached values are removed via __copy__() on ForeignObject for
# consistency of behavior.
foreign_object_copy = copy.copy(foreign_object)
self.assertNotIn("path_infos", foreign_object_copy.__dict__)
self.assertNotIn("reverse_path_infos", foreign_object_copy.__dict__)
# ForeignObjectRel's remains because it's part of a shallow copy.
self.assertIn("path_infos", foreign_object_copy.remote_field.__dict__)
def test_deepcopy_removes_cached_values(self):
"""
Deep copying a ForeignObject removes the object's cached PathInfo
values, including those of the related ForeignObjectRel.
"""
foreign_object = Membership._meta.get_field("person")
# Trigger storage of cached_property into ForeignObject's __dict__.
foreign_object.path_infos
foreign_object.reverse_path_infos
# The ForeignObjectRel doesn't have reverse_path_infos.
foreign_object.remote_field.path_infos
self.assertIn("path_infos", foreign_object.__dict__)
self.assertIn("reverse_path_infos", foreign_object.__dict__)
self.assertIn("path_infos", foreign_object.remote_field.__dict__)
# Cached value is removed via __getstate__() on ForeignObjectRel
# because no __deepcopy__() method exists, so __reduce_ex__() is used.
remote_field_copy = copy.deepcopy(foreign_object.remote_field)
self.assertNotIn("path_infos", remote_field_copy.__dict__)
# Field.__deepcopy__() internally uses __copy__() on both the
# ForeignObject and ForeignObjectRel, so all cached values are removed.
foreign_object_copy = copy.deepcopy(foreign_object)
self.assertNotIn("path_infos", foreign_object_copy.__dict__)
self.assertNotIn("reverse_path_infos", foreign_object_copy.__dict__)
self.assertNotIn("path_infos", foreign_object_copy.remote_field.__dict__)
def test_pickling_foreignobjectrel(self):
"""
Pickling a ForeignObjectRel removes the path_infos attribute.
ForeignObjectRel implements __getstate__(), so copy and pickle modules
both use that, but ForeignObject implements __reduce__() and __copy__()
separately, so doesn't share the same behaviour.
"""
foreign_object_rel = Membership._meta.get_field("person").remote_field
# Trigger storage of cached_property into ForeignObjectRel's __dict__.
foreign_object_rel.path_infos
self.assertIn("path_infos", foreign_object_rel.__dict__)
foreign_object_rel_restored = pickle.loads(pickle.dumps(foreign_object_rel))
self.assertNotIn("path_infos", foreign_object_rel_restored.__dict__)
def test_pickling_foreignobject(self):
"""
Pickling a ForeignObject does not remove the cached PathInfo values.
ForeignObject will always keep the path_infos and reverse_path_infos
attributes within the same process, because of the way
Field.__reduce__() is used for restoring values.
"""
foreign_object = Membership._meta.get_field("person")
# Trigger storage of cached_property into ForeignObjectRel's __dict__
foreign_object.path_infos
foreign_object.reverse_path_infos
self.assertIn("path_infos", foreign_object.__dict__)
self.assertIn("reverse_path_infos", foreign_object.__dict__)
foreign_object_restored = pickle.loads(pickle.dumps(foreign_object))
self.assertIn("path_infos", foreign_object_restored.__dict__)
self.assertIn("reverse_path_infos", foreign_object_restored.__dict__)