1. import pickle
    
  2. from datetime import datetime
    
  3. from functools import partialmethod
    
  4. from io import StringIO
    
  5. from unittest import mock, skipIf
    
  6. 
    
  7. from django.core import serializers
    
  8. from django.core.serializers import SerializerDoesNotExist
    
  9. from django.core.serializers.base import PickleSerializer, ProgressBar
    
  10. from django.db import connection, transaction
    
  11. from django.http import HttpResponse
    
  12. from django.test import SimpleTestCase, override_settings, skipUnlessDBFeature
    
  13. from django.test.utils import Approximate, ignore_warnings
    
  14. from django.utils.deprecation import RemovedInDjango50Warning
    
  15. 
    
  16. from .models import (
    
  17.     Actor,
    
  18.     Article,
    
  19.     Author,
    
  20.     AuthorProfile,
    
  21.     BaseModel,
    
  22.     Category,
    
  23.     Child,
    
  24.     ComplexModel,
    
  25.     Movie,
    
  26.     Player,
    
  27.     ProxyBaseModel,
    
  28.     ProxyProxyBaseModel,
    
  29.     Score,
    
  30.     Team,
    
  31. )
    
  32. 
    
  33. 
    
  34. @override_settings(
    
  35.     SERIALIZATION_MODULES={
    
  36.         "json2": "django.core.serializers.json",
    
  37.     }
    
  38. )
    
  39. class SerializerRegistrationTests(SimpleTestCase):
    
  40.     def setUp(self):
    
  41.         self.old_serializers = serializers._serializers
    
  42.         serializers._serializers = {}
    
  43. 
    
  44.     def tearDown(self):
    
  45.         serializers._serializers = self.old_serializers
    
  46. 
    
  47.     def test_register(self):
    
  48.         "Registering a new serializer populates the full registry. Refs #14823"
    
  49.         serializers.register_serializer("json3", "django.core.serializers.json")
    
  50. 
    
  51.         public_formats = serializers.get_public_serializer_formats()
    
  52.         self.assertIn("json3", public_formats)
    
  53.         self.assertIn("json2", public_formats)
    
  54.         self.assertIn("xml", public_formats)
    
  55. 
    
  56.     def test_unregister(self):
    
  57.         """
    
  58.         Unregistering a serializer doesn't cause the registry to be
    
  59.         repopulated.
    
  60.         """
    
  61.         serializers.unregister_serializer("xml")
    
  62.         serializers.register_serializer("json3", "django.core.serializers.json")
    
  63. 
    
  64.         public_formats = serializers.get_public_serializer_formats()
    
  65. 
    
  66.         self.assertNotIn("xml", public_formats)
    
  67.         self.assertIn("json3", public_formats)
    
  68. 
    
  69.     def test_unregister_unknown_serializer(self):
    
  70.         with self.assertRaises(SerializerDoesNotExist):
    
  71.             serializers.unregister_serializer("nonsense")
    
  72. 
    
  73.     def test_builtin_serializers(self):
    
  74.         "Requesting a list of serializer formats populates the registry"
    
  75.         all_formats = set(serializers.get_serializer_formats())
    
  76.         public_formats = set(serializers.get_public_serializer_formats())
    
  77. 
    
  78.         self.assertIn("xml", all_formats),
    
  79.         self.assertIn("xml", public_formats)
    
  80. 
    
  81.         self.assertIn("json2", all_formats)
    
  82.         self.assertIn("json2", public_formats)
    
  83. 
    
  84.         self.assertIn("python", all_formats)
    
  85.         self.assertNotIn("python", public_formats)
    
  86. 
    
  87.     def test_get_unknown_serializer(self):
    
  88.         """
    
  89.         #15889: get_serializer('nonsense') raises a SerializerDoesNotExist
    
  90.         """
    
  91.         with self.assertRaises(SerializerDoesNotExist):
    
  92.             serializers.get_serializer("nonsense")
    
  93. 
    
  94.         with self.assertRaises(KeyError):
    
  95.             serializers.get_serializer("nonsense")
    
  96. 
    
  97.         # SerializerDoesNotExist is instantiated with the nonexistent format
    
  98.         with self.assertRaisesMessage(SerializerDoesNotExist, "nonsense"):
    
  99.             serializers.get_serializer("nonsense")
    
  100. 
    
  101.     def test_get_unknown_deserializer(self):
    
  102.         with self.assertRaises(SerializerDoesNotExist):
    
  103.             serializers.get_deserializer("nonsense")
    
  104. 
    
  105. 
    
  106. class SerializersTestBase:
    
  107.     serializer_name = None  # Set by subclasses to the serialization format name
    
  108. 
    
  109.     @classmethod
    
  110.     def setUpTestData(cls):
    
  111.         sports = Category.objects.create(name="Sports")
    
  112.         music = Category.objects.create(name="Music")
    
  113.         op_ed = Category.objects.create(name="Op-Ed")
    
  114. 
    
  115.         cls.joe = Author.objects.create(name="Joe")
    
  116.         cls.jane = Author.objects.create(name="Jane")
    
  117. 
    
  118.         cls.a1 = Article(
    
  119.             author=cls.jane,
    
  120.             headline="Poker has no place on ESPN",
    
  121.             pub_date=datetime(2006, 6, 16, 11, 00),
    
  122.         )
    
  123.         cls.a1.save()
    
  124.         cls.a1.categories.set([sports, op_ed])
    
  125. 
    
  126.         cls.a2 = Article(
    
  127.             author=cls.joe,
    
  128.             headline="Time to reform copyright",
    
  129.             pub_date=datetime(2006, 6, 16, 13, 00, 11, 345),
    
  130.         )
    
  131.         cls.a2.save()
    
  132.         cls.a2.categories.set([music, op_ed])
    
  133. 
    
  134.     def test_serialize(self):
    
  135.         """Basic serialization works."""
    
  136.         serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
    
  137.         self.assertTrue(self._validate_output(serial_str))
    
  138. 
    
  139.     def test_serializer_roundtrip(self):
    
  140.         """Serialized content can be deserialized."""
    
  141.         serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
    
  142.         models = list(serializers.deserialize(self.serializer_name, serial_str))
    
  143.         self.assertEqual(len(models), 2)
    
  144. 
    
  145.     def test_serialize_to_stream(self):
    
  146.         obj = ComplexModel(field1="first", field2="second", field3="third")
    
  147.         obj.save_base(raw=True)
    
  148. 
    
  149.         # Serialize the test database to a stream
    
  150.         for stream in (StringIO(), HttpResponse()):
    
  151.             serializers.serialize(self.serializer_name, [obj], indent=2, stream=stream)
    
  152. 
    
  153.             # Serialize normally for a comparison
    
  154.             string_data = serializers.serialize(self.serializer_name, [obj], indent=2)
    
  155. 
    
  156.             # The two are the same
    
  157.             if isinstance(stream, StringIO):
    
  158.                 self.assertEqual(string_data, stream.getvalue())
    
  159.             else:
    
  160.                 self.assertEqual(string_data, stream.content.decode())
    
  161. 
    
  162.     def test_serialize_specific_fields(self):
    
  163.         obj = ComplexModel(field1="first", field2="second", field3="third")
    
  164.         obj.save_base(raw=True)
    
  165. 
    
  166.         # Serialize then deserialize the test database
    
  167.         serialized_data = serializers.serialize(
    
  168.             self.serializer_name, [obj], indent=2, fields=("field1", "field3")
    
  169.         )
    
  170.         result = next(serializers.deserialize(self.serializer_name, serialized_data))
    
  171. 
    
  172.         # The deserialized object contains data in only the serialized fields.
    
  173.         self.assertEqual(result.object.field1, "first")
    
  174.         self.assertEqual(result.object.field2, "")
    
  175.         self.assertEqual(result.object.field3, "third")
    
  176. 
    
  177.     def test_altering_serialized_output(self):
    
  178.         """
    
  179.         The ability to create new objects by modifying serialized content.
    
  180.         """
    
  181.         old_headline = "Poker has no place on ESPN"
    
  182.         new_headline = "Poker has no place on television"
    
  183.         serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
    
  184.         serial_str = serial_str.replace(old_headline, new_headline)
    
  185.         models = list(serializers.deserialize(self.serializer_name, serial_str))
    
  186. 
    
  187.         # Prior to saving, old headline is in place
    
  188.         self.assertTrue(Article.objects.filter(headline=old_headline))
    
  189.         self.assertFalse(Article.objects.filter(headline=new_headline))
    
  190. 
    
  191.         for model in models:
    
  192.             model.save()
    
  193. 
    
  194.         # After saving, new headline is in place
    
  195.         self.assertTrue(Article.objects.filter(headline=new_headline))
    
  196.         self.assertFalse(Article.objects.filter(headline=old_headline))
    
  197. 
    
  198.     def test_one_to_one_as_pk(self):
    
  199.         """
    
  200.         If you use your own primary key field (such as a OneToOneField), it
    
  201.         doesn't appear in the serialized field list - it replaces the pk
    
  202.         identifier.
    
  203.         """
    
  204.         AuthorProfile.objects.create(
    
  205.             author=self.joe, date_of_birth=datetime(1970, 1, 1)
    
  206.         )
    
  207.         serial_str = serializers.serialize(
    
  208.             self.serializer_name, AuthorProfile.objects.all()
    
  209.         )
    
  210.         self.assertFalse(self._get_field_values(serial_str, "author"))
    
  211. 
    
  212.         for obj in serializers.deserialize(self.serializer_name, serial_str):
    
  213.             self.assertEqual(obj.object.pk, self.joe.pk)
    
  214. 
    
  215.     def test_serialize_field_subset(self):
    
  216.         """Output can be restricted to a subset of fields"""
    
  217.         valid_fields = ("headline", "pub_date")
    
  218.         invalid_fields = ("author", "categories")
    
  219.         serial_str = serializers.serialize(
    
  220.             self.serializer_name, Article.objects.all(), fields=valid_fields
    
  221.         )
    
  222.         for field_name in invalid_fields:
    
  223.             self.assertFalse(self._get_field_values(serial_str, field_name))
    
  224. 
    
  225.         for field_name in valid_fields:
    
  226.             self.assertTrue(self._get_field_values(serial_str, field_name))
    
  227. 
    
  228.     def test_serialize_unicode_roundtrip(self):
    
  229.         """Unicode makes the roundtrip intact"""
    
  230.         actor_name = "Za\u017c\u00f3\u0142\u0107"
    
  231.         movie_title = "G\u0119\u015bl\u0105 ja\u017a\u0144"
    
  232.         ac = Actor(name=actor_name)
    
  233.         mv = Movie(title=movie_title, actor=ac)
    
  234.         ac.save()
    
  235.         mv.save()
    
  236. 
    
  237.         serial_str = serializers.serialize(self.serializer_name, [mv])
    
  238.         self.assertEqual(self._get_field_values(serial_str, "title")[0], movie_title)
    
  239.         self.assertEqual(self._get_field_values(serial_str, "actor")[0], actor_name)
    
  240. 
    
  241.         obj_list = list(serializers.deserialize(self.serializer_name, serial_str))
    
  242.         mv_obj = obj_list[0].object
    
  243.         self.assertEqual(mv_obj.title, movie_title)
    
  244. 
    
  245.     def test_unicode_serialization(self):
    
  246.         unicode_name = "יוניקוד"
    
  247.         data = serializers.serialize(self.serializer_name, [Author(name=unicode_name)])
    
  248.         self.assertIn(unicode_name, data)
    
  249.         objs = list(serializers.deserialize(self.serializer_name, data))
    
  250.         self.assertEqual(objs[0].object.name, unicode_name)
    
  251. 
    
  252.     def test_serialize_progressbar(self):
    
  253.         fake_stdout = StringIO()
    
  254.         serializers.serialize(
    
  255.             self.serializer_name,
    
  256.             Article.objects.all(),
    
  257.             progress_output=fake_stdout,
    
  258.             object_count=Article.objects.count(),
    
  259.         )
    
  260.         self.assertTrue(
    
  261.             fake_stdout.getvalue().endswith(
    
  262.                 "[" + "." * ProgressBar.progress_width + "]\n"
    
  263.             )
    
  264.         )
    
  265. 
    
  266.     def test_serialize_superfluous_queries(self):
    
  267.         """Ensure no superfluous queries are made when serializing ForeignKeys
    
  268. 
    
  269.         #17602
    
  270.         """
    
  271.         ac = Actor(name="Actor name")
    
  272.         ac.save()
    
  273.         mv = Movie(title="Movie title", actor_id=ac.pk)
    
  274.         mv.save()
    
  275. 
    
  276.         with self.assertNumQueries(0):
    
  277.             serializers.serialize(self.serializer_name, [mv])
    
  278. 
    
  279.     def test_serialize_prefetch_related_m2m(self):
    
  280.         # One query for the Article table and one for each prefetched m2m
    
  281.         # field.
    
  282.         with self.assertNumQueries(3):
    
  283.             serializers.serialize(
    
  284.                 self.serializer_name,
    
  285.                 Article.objects.prefetch_related("categories", "meta_data"),
    
  286.             )
    
  287.         # One query for the Article table, and two m2m queries for each
    
  288.         # article.
    
  289.         with self.assertNumQueries(5):
    
  290.             serializers.serialize(self.serializer_name, Article.objects.all())
    
  291. 
    
  292.     def test_serialize_with_null_pk(self):
    
  293.         """
    
  294.         Serialized data with no primary key results
    
  295.         in a model instance with no id
    
  296.         """
    
  297.         category = Category(name="Reference")
    
  298.         serial_str = serializers.serialize(self.serializer_name, [category])
    
  299.         pk_value = self._get_pk_values(serial_str)[0]
    
  300.         self.assertFalse(pk_value)
    
  301. 
    
  302.         cat_obj = list(serializers.deserialize(self.serializer_name, serial_str))[
    
  303.             0
    
  304.         ].object
    
  305.         self.assertIsNone(cat_obj.id)
    
  306. 
    
  307.     def test_float_serialization(self):
    
  308.         """Float values serialize and deserialize intact"""
    
  309.         sc = Score(score=3.4)
    
  310.         sc.save()
    
  311.         serial_str = serializers.serialize(self.serializer_name, [sc])
    
  312.         deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
    
  313.         self.assertEqual(deserial_objs[0].object.score, Approximate(3.4, places=1))
    
  314. 
    
  315.     def test_deferred_field_serialization(self):
    
  316.         author = Author.objects.create(name="Victor Hugo")
    
  317.         author = Author.objects.defer("name").get(pk=author.pk)
    
  318.         serial_str = serializers.serialize(self.serializer_name, [author])
    
  319.         deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
    
  320.         self.assertIsInstance(deserial_objs[0].object, Author)
    
  321. 
    
  322.     def test_custom_field_serialization(self):
    
  323.         """Custom fields serialize and deserialize intact"""
    
  324.         team_str = "Spartak Moskva"
    
  325.         player = Player()
    
  326.         player.name = "Soslan Djanaev"
    
  327.         player.rank = 1
    
  328.         player.team = Team(team_str)
    
  329.         player.save()
    
  330.         serial_str = serializers.serialize(self.serializer_name, Player.objects.all())
    
  331.         team = self._get_field_values(serial_str, "team")
    
  332.         self.assertTrue(team)
    
  333.         self.assertEqual(team[0], team_str)
    
  334. 
    
  335.         deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
    
  336.         self.assertEqual(
    
  337.             deserial_objs[0].object.team.to_string(), player.team.to_string()
    
  338.         )
    
  339. 
    
  340.     def test_pre_1000ad_date(self):
    
  341.         """Year values before 1000AD are properly formatted"""
    
  342.         # Regression for #12524 -- dates before 1000AD get prefixed
    
  343.         # 0's on the year
    
  344.         a = Article.objects.create(
    
  345.             author=self.jane,
    
  346.             headline="Nobody remembers the early years",
    
  347.             pub_date=datetime(1, 2, 3, 4, 5, 6),
    
  348.         )
    
  349. 
    
  350.         serial_str = serializers.serialize(self.serializer_name, [a])
    
  351.         date_values = self._get_field_values(serial_str, "pub_date")
    
  352.         self.assertEqual(date_values[0].replace("T", " "), "0001-02-03 04:05:06")
    
  353. 
    
  354.     def test_pkless_serialized_strings(self):
    
  355.         """
    
  356.         Serialized strings without PKs can be turned into models
    
  357.         """
    
  358.         deserial_objs = list(
    
  359.             serializers.deserialize(self.serializer_name, self.pkless_str)
    
  360.         )
    
  361.         for obj in deserial_objs:
    
  362.             self.assertFalse(obj.object.id)
    
  363.             obj.save()
    
  364.         self.assertEqual(Category.objects.count(), 5)
    
  365. 
    
  366.     def test_deterministic_mapping_ordering(self):
    
  367.         """Mapping such as fields should be deterministically ordered. (#24558)"""
    
  368.         output = serializers.serialize(self.serializer_name, [self.a1], indent=2)
    
  369.         categories = self.a1.categories.values_list("pk", flat=True)
    
  370.         self.assertEqual(
    
  371.             output,
    
  372.             self.mapping_ordering_str
    
  373.             % {
    
  374.                 "article_pk": self.a1.pk,
    
  375.                 "author_pk": self.a1.author_id,
    
  376.                 "first_category_pk": categories[0],
    
  377.                 "second_category_pk": categories[1],
    
  378.             },
    
  379.         )
    
  380. 
    
  381.     def test_deserialize_force_insert(self):
    
  382.         """Deserialized content can be saved with force_insert as a parameter."""
    
  383.         serial_str = serializers.serialize(self.serializer_name, [self.a1])
    
  384.         deserial_obj = list(serializers.deserialize(self.serializer_name, serial_str))[
    
  385.             0
    
  386.         ]
    
  387.         with mock.patch("django.db.models.Model") as mock_model:
    
  388.             deserial_obj.save(force_insert=False)
    
  389.             mock_model.save_base.assert_called_with(
    
  390.                 deserial_obj.object, raw=True, using=None, force_insert=False
    
  391.             )
    
  392. 
    
  393.     @skipUnlessDBFeature("can_defer_constraint_checks")
    
  394.     def test_serialize_proxy_model(self):
    
  395.         BaseModel.objects.create(parent_data=1)
    
  396.         base_objects = BaseModel.objects.all()
    
  397.         proxy_objects = ProxyBaseModel.objects.all()
    
  398.         proxy_proxy_objects = ProxyProxyBaseModel.objects.all()
    
  399.         base_data = serializers.serialize("json", base_objects)
    
  400.         proxy_data = serializers.serialize("json", proxy_objects)
    
  401.         proxy_proxy_data = serializers.serialize("json", proxy_proxy_objects)
    
  402.         self.assertEqual(base_data, proxy_data.replace("proxy", ""))
    
  403.         self.assertEqual(base_data, proxy_proxy_data.replace("proxy", ""))
    
  404. 
    
  405.     def test_serialize_inherited_fields(self):
    
  406.         child_1 = Child.objects.create(parent_data="a", child_data="b")
    
  407.         child_2 = Child.objects.create(parent_data="c", child_data="d")
    
  408.         child_1.parent_m2m.add(child_2)
    
  409.         child_data = serializers.serialize(self.serializer_name, [child_1, child_2])
    
  410.         self.assertEqual(self._get_field_values(child_data, "parent_m2m"), [])
    
  411.         self.assertEqual(self._get_field_values(child_data, "parent_data"), [])
    
  412. 
    
  413. 
    
  414. class SerializerAPITests(SimpleTestCase):
    
  415.     def test_stream_class(self):
    
  416.         class File:
    
  417.             def __init__(self):
    
  418.                 self.lines = []
    
  419. 
    
  420.             def write(self, line):
    
  421.                 self.lines.append(line)
    
  422. 
    
  423.             def getvalue(self):
    
  424.                 return "".join(self.lines)
    
  425. 
    
  426.         class Serializer(serializers.json.Serializer):
    
  427.             stream_class = File
    
  428. 
    
  429.         serializer = Serializer()
    
  430.         data = serializer.serialize([Score(id=1, score=3.4)])
    
  431.         self.assertIs(serializer.stream_class, File)
    
  432.         self.assertIsInstance(serializer.stream, File)
    
  433.         self.assertEqual(
    
  434.             data, '[{"model": "serializers.score", "pk": 1, "fields": {"score": 3.4}}]'
    
  435.         )
    
  436. 
    
  437. 
    
  438. class SerializersTransactionTestBase:
    
  439.     available_apps = ["serializers"]
    
  440. 
    
  441.     @skipUnlessDBFeature("supports_forward_references")
    
  442.     def test_forward_refs(self):
    
  443.         """
    
  444.         Objects ids can be referenced before they are
    
  445.         defined in the serialization data.
    
  446.         """
    
  447.         # The deserialization process needs to run in a transaction in order
    
  448.         # to test forward reference handling.
    
  449.         with transaction.atomic():
    
  450.             objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str)
    
  451.             with connection.constraint_checks_disabled():
    
  452.                 for obj in objs:
    
  453.                     obj.save()
    
  454. 
    
  455.         for model_cls in (Category, Author, Article):
    
  456.             self.assertEqual(model_cls.objects.count(), 1)
    
  457.         art_obj = Article.objects.all()[0]
    
  458.         self.assertEqual(art_obj.categories.count(), 1)
    
  459.         self.assertEqual(art_obj.author.name, "Agnes")
    
  460. 
    
  461. 
    
  462. class PickleSerializerTests(SimpleTestCase):
    
  463.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  464.     def test_serializer_protocol(self):
    
  465.         serializer = PickleSerializer(protocol=3)
    
  466.         self.assertEqual(serializer.protocol, 3)
    
  467.         # If protocol is not provided, it defaults to pickle.HIGHEST_PROTOCOL
    
  468.         serializer = PickleSerializer()
    
  469.         self.assertEqual(serializer.protocol, pickle.HIGHEST_PROTOCOL)
    
  470. 
    
  471.     @ignore_warnings(category=RemovedInDjango50Warning)
    
  472.     def test_serializer_loads_dumps(self):
    
  473.         serializer = PickleSerializer()
    
  474.         test_data = "test data"
    
  475.         dump = serializer.dumps(test_data)
    
  476.         self.assertEqual(serializer.loads(dump), test_data)
    
  477. 
    
  478.     def test_serializer_warning(self):
    
  479.         msg = (
    
  480.             "PickleSerializer is deprecated due to its security risk. Use "
    
  481.             "JSONSerializer instead."
    
  482.         )
    
  483.         with self.assertRaisesMessage(RemovedInDjango50Warning, msg):
    
  484.             PickleSerializer()
    
  485. 
    
  486. 
    
  487. def register_tests(test_class, method_name, test_func, exclude=()):
    
  488.     """
    
  489.     Dynamically create serializer tests to ensure that all registered
    
  490.     serializers are automatically tested.
    
  491.     """
    
  492.     for format_ in serializers.get_serializer_formats():
    
  493.         if format_ == "geojson" or format_ in exclude:
    
  494.             continue
    
  495.         decorated_func = skipIf(
    
  496.             isinstance(serializers.get_serializer(format_), serializers.BadSerializer),
    
  497.             "The Python library for the %s serializer is not installed." % format_,
    
  498.         )(test_func)
    
  499.         setattr(
    
  500.             test_class, method_name % format_, partialmethod(decorated_func, format_)
    
  501.         )