1. from django.core import serializers
    
  2. from django.db import connection
    
  3. from django.test import TestCase
    
  4. 
    
  5. from .models import (
    
  6.     Child,
    
  7.     FKAsPKNoNaturalKey,
    
  8.     FKDataNaturalKey,
    
  9.     NaturalKeyAnchor,
    
  10.     NaturalKeyThing,
    
  11.     NaturalPKWithDefault,
    
  12. )
    
  13. from .tests import register_tests
    
  14. 
    
  15. 
    
  16. class NaturalKeySerializerTests(TestCase):
    
  17.     pass
    
  18. 
    
  19. 
    
  20. def natural_key_serializer_test(self, format):
    
  21.     # Create all the objects defined in the test data
    
  22.     with connection.constraint_checks_disabled():
    
  23.         objects = [
    
  24.             NaturalKeyAnchor.objects.create(id=1100, data="Natural Key Anghor"),
    
  25.             FKDataNaturalKey.objects.create(id=1101, data_id=1100),
    
  26.             FKDataNaturalKey.objects.create(id=1102, data_id=None),
    
  27.         ]
    
  28.     # Serialize the test database
    
  29.     serialized_data = serializers.serialize(
    
  30.         format, objects, indent=2, use_natural_foreign_keys=True
    
  31.     )
    
  32. 
    
  33.     for obj in serializers.deserialize(format, serialized_data):
    
  34.         obj.save()
    
  35. 
    
  36.     # Assert that the deserialized data is the same
    
  37.     # as the original source
    
  38.     for obj in objects:
    
  39.         instance = obj.__class__.objects.get(id=obj.pk)
    
  40.         self.assertEqual(
    
  41.             obj.data,
    
  42.             instance.data,
    
  43.             "Objects with PK=%d not equal; expected '%s' (%s), got '%s' (%s)"
    
  44.             % (
    
  45.                 obj.pk,
    
  46.                 obj.data,
    
  47.                 type(obj.data),
    
  48.                 instance,
    
  49.                 type(instance.data),
    
  50.             ),
    
  51.         )
    
  52. 
    
  53. 
    
  54. def natural_key_test(self, format):
    
  55.     book1 = {
    
  56.         "data": "978-1590597255",
    
  57.         "title": "The Definitive Guide to Django: Web Development Done Right",
    
  58.     }
    
  59.     book2 = {"data": "978-1590599969", "title": "Practical Django Projects"}
    
  60. 
    
  61.     # Create the books.
    
  62.     adrian = NaturalKeyAnchor.objects.create(**book1)
    
  63.     james = NaturalKeyAnchor.objects.create(**book2)
    
  64. 
    
  65.     # Serialize the books.
    
  66.     string_data = serializers.serialize(
    
  67.         format,
    
  68.         NaturalKeyAnchor.objects.all(),
    
  69.         indent=2,
    
  70.         use_natural_foreign_keys=True,
    
  71.         use_natural_primary_keys=True,
    
  72.     )
    
  73. 
    
  74.     # Delete one book (to prove that the natural key generation will only
    
  75.     # restore the primary keys of books found in the database via the
    
  76.     # get_natural_key manager method).
    
  77.     james.delete()
    
  78. 
    
  79.     # Deserialize and test.
    
  80.     books = list(serializers.deserialize(format, string_data))
    
  81.     self.assertCountEqual(
    
  82.         [(book.object.title, book.object.pk) for book in books],
    
  83.         [
    
  84.             (book1["title"], adrian.pk),
    
  85.             (book2["title"], None),
    
  86.         ],
    
  87.     )
    
  88. 
    
  89. 
    
  90. def natural_pk_mti_test(self, format):
    
  91.     """
    
  92.     If serializing objects in a multi-table inheritance relationship using
    
  93.     natural primary keys, the natural foreign key for the parent is output in
    
  94.     the fields of the child so it's possible to relate the child to the parent
    
  95.     when deserializing.
    
  96.     """
    
  97.     child_1 = Child.objects.create(parent_data="1", child_data="1")
    
  98.     child_2 = Child.objects.create(parent_data="2", child_data="2")
    
  99.     string_data = serializers.serialize(
    
  100.         format,
    
  101.         [child_1.parent_ptr, child_2.parent_ptr, child_2, child_1],
    
  102.         use_natural_foreign_keys=True,
    
  103.         use_natural_primary_keys=True,
    
  104.     )
    
  105.     child_1.delete()
    
  106.     child_2.delete()
    
  107.     for obj in serializers.deserialize(format, string_data):
    
  108.         obj.save()
    
  109.     children = Child.objects.all()
    
  110.     self.assertEqual(len(children), 2)
    
  111.     for child in children:
    
  112.         # If it's possible to find the superclass from the subclass and it's
    
  113.         # the correct superclass, it's working.
    
  114.         self.assertEqual(child.child_data, child.parent_data)
    
  115. 
    
  116. 
    
  117. def forward_ref_fk_test(self, format):
    
  118.     t1 = NaturalKeyThing.objects.create(key="t1")
    
  119.     t2 = NaturalKeyThing.objects.create(key="t2", other_thing=t1)
    
  120.     t1.other_thing = t2
    
  121.     t1.save()
    
  122.     string_data = serializers.serialize(
    
  123.         format,
    
  124.         [t1, t2],
    
  125.         use_natural_primary_keys=True,
    
  126.         use_natural_foreign_keys=True,
    
  127.     )
    
  128.     NaturalKeyThing.objects.all().delete()
    
  129.     objs_with_deferred_fields = []
    
  130.     for obj in serializers.deserialize(
    
  131.         format, string_data, handle_forward_references=True
    
  132.     ):
    
  133.         obj.save()
    
  134.         if obj.deferred_fields:
    
  135.             objs_with_deferred_fields.append(obj)
    
  136.     for obj in objs_with_deferred_fields:
    
  137.         obj.save_deferred_fields()
    
  138.     t1 = NaturalKeyThing.objects.get(key="t1")
    
  139.     t2 = NaturalKeyThing.objects.get(key="t2")
    
  140.     self.assertEqual(t1.other_thing, t2)
    
  141.     self.assertEqual(t2.other_thing, t1)
    
  142. 
    
  143. 
    
  144. def forward_ref_fk_with_error_test(self, format):
    
  145.     t1 = NaturalKeyThing.objects.create(key="t1")
    
  146.     t2 = NaturalKeyThing.objects.create(key="t2", other_thing=t1)
    
  147.     t1.other_thing = t2
    
  148.     t1.save()
    
  149.     string_data = serializers.serialize(
    
  150.         format,
    
  151.         [t1],
    
  152.         use_natural_primary_keys=True,
    
  153.         use_natural_foreign_keys=True,
    
  154.     )
    
  155.     NaturalKeyThing.objects.all().delete()
    
  156.     objs_with_deferred_fields = []
    
  157.     for obj in serializers.deserialize(
    
  158.         format, string_data, handle_forward_references=True
    
  159.     ):
    
  160.         obj.save()
    
  161.         if obj.deferred_fields:
    
  162.             objs_with_deferred_fields.append(obj)
    
  163.     obj = objs_with_deferred_fields[0]
    
  164.     msg = "NaturalKeyThing matching query does not exist"
    
  165.     with self.assertRaisesMessage(serializers.base.DeserializationError, msg):
    
  166.         obj.save_deferred_fields()
    
  167. 
    
  168. 
    
  169. def forward_ref_m2m_test(self, format):
    
  170.     t1 = NaturalKeyThing.objects.create(key="t1")
    
  171.     t2 = NaturalKeyThing.objects.create(key="t2")
    
  172.     t3 = NaturalKeyThing.objects.create(key="t3")
    
  173.     t1.other_things.set([t2, t3])
    
  174.     string_data = serializers.serialize(
    
  175.         format,
    
  176.         [t1, t2, t3],
    
  177.         use_natural_primary_keys=True,
    
  178.         use_natural_foreign_keys=True,
    
  179.     )
    
  180.     NaturalKeyThing.objects.all().delete()
    
  181.     objs_with_deferred_fields = []
    
  182.     for obj in serializers.deserialize(
    
  183.         format, string_data, handle_forward_references=True
    
  184.     ):
    
  185.         obj.save()
    
  186.         if obj.deferred_fields:
    
  187.             objs_with_deferred_fields.append(obj)
    
  188.     for obj in objs_with_deferred_fields:
    
  189.         obj.save_deferred_fields()
    
  190.     t1 = NaturalKeyThing.objects.get(key="t1")
    
  191.     t2 = NaturalKeyThing.objects.get(key="t2")
    
  192.     t3 = NaturalKeyThing.objects.get(key="t3")
    
  193.     self.assertCountEqual(t1.other_things.all(), [t2, t3])
    
  194. 
    
  195. 
    
  196. def forward_ref_m2m_with_error_test(self, format):
    
  197.     t1 = NaturalKeyThing.objects.create(key="t1")
    
  198.     t2 = NaturalKeyThing.objects.create(key="t2")
    
  199.     t3 = NaturalKeyThing.objects.create(key="t3")
    
  200.     t1.other_things.set([t2, t3])
    
  201.     t1.save()
    
  202.     string_data = serializers.serialize(
    
  203.         format,
    
  204.         [t1, t2],
    
  205.         use_natural_primary_keys=True,
    
  206.         use_natural_foreign_keys=True,
    
  207.     )
    
  208.     NaturalKeyThing.objects.all().delete()
    
  209.     objs_with_deferred_fields = []
    
  210.     for obj in serializers.deserialize(
    
  211.         format, string_data, handle_forward_references=True
    
  212.     ):
    
  213.         obj.save()
    
  214.         if obj.deferred_fields:
    
  215.             objs_with_deferred_fields.append(obj)
    
  216.     obj = objs_with_deferred_fields[0]
    
  217.     msg = "NaturalKeyThing matching query does not exist"
    
  218.     with self.assertRaisesMessage(serializers.base.DeserializationError, msg):
    
  219.         obj.save_deferred_fields()
    
  220. 
    
  221. 
    
  222. def pk_with_default(self, format):
    
  223.     """
    
  224.     The deserializer works with natural keys when the primary key has a default
    
  225.     value.
    
  226.     """
    
  227.     obj = NaturalPKWithDefault.objects.create(name="name")
    
  228.     string_data = serializers.serialize(
    
  229.         format,
    
  230.         NaturalPKWithDefault.objects.all(),
    
  231.         use_natural_foreign_keys=True,
    
  232.         use_natural_primary_keys=True,
    
  233.     )
    
  234.     objs = list(serializers.deserialize(format, string_data))
    
  235.     self.assertEqual(len(objs), 1)
    
  236.     self.assertEqual(objs[0].object.pk, obj.pk)
    
  237. 
    
  238. 
    
  239. def fk_as_pk_natural_key_not_called(self, format):
    
  240.     """
    
  241.     The deserializer doesn't rely on natural keys when a model has a custom
    
  242.     primary key that is a ForeignKey.
    
  243.     """
    
  244.     o1 = NaturalKeyAnchor.objects.create(data="978-1590599969")
    
  245.     o2 = FKAsPKNoNaturalKey.objects.create(pk_fk=o1)
    
  246.     serialized_data = serializers.serialize(format, [o1, o2])
    
  247.     deserialized_objects = list(serializers.deserialize(format, serialized_data))
    
  248.     self.assertEqual(len(deserialized_objects), 2)
    
  249.     for obj in deserialized_objects:
    
  250.         self.assertEqual(obj.object.pk, o1.pk)
    
  251. 
    
  252. 
    
  253. # Dynamically register tests for each serializer
    
  254. register_tests(
    
  255.     NaturalKeySerializerTests,
    
  256.     "test_%s_natural_key_serializer",
    
  257.     natural_key_serializer_test,
    
  258. )
    
  259. register_tests(
    
  260.     NaturalKeySerializerTests, "test_%s_serializer_natural_keys", natural_key_test
    
  261. )
    
  262. register_tests(
    
  263.     NaturalKeySerializerTests, "test_%s_serializer_natural_pks_mti", natural_pk_mti_test
    
  264. )
    
  265. register_tests(
    
  266.     NaturalKeySerializerTests, "test_%s_forward_references_fks", forward_ref_fk_test
    
  267. )
    
  268. register_tests(
    
  269.     NaturalKeySerializerTests,
    
  270.     "test_%s_forward_references_fk_errors",
    
  271.     forward_ref_fk_with_error_test,
    
  272. )
    
  273. register_tests(
    
  274.     NaturalKeySerializerTests, "test_%s_forward_references_m2ms", forward_ref_m2m_test
    
  275. )
    
  276. register_tests(
    
  277.     NaturalKeySerializerTests,
    
  278.     "test_%s_forward_references_m2m_errors",
    
  279.     forward_ref_m2m_with_error_test,
    
  280. )
    
  281. register_tests(NaturalKeySerializerTests, "test_%s_pk_with_default", pk_with_default)
    
  282. register_tests(
    
  283.     NaturalKeySerializerTests,
    
  284.     "test_%s_fk_as_pk_natural_key_not_called",
    
  285.     fk_as_pk_natural_key_not_called,
    
  286. )