1. """Tests for django.db.backends.utils"""
    
  2. from decimal import Decimal, Rounded
    
  3. 
    
  4. from django.db import NotSupportedError, connection
    
  5. from django.db.backends.utils import (
    
  6.     format_number,
    
  7.     split_identifier,
    
  8.     split_tzname_delta,
    
  9.     truncate_name,
    
  10. )
    
  11. from django.test import (
    
  12.     SimpleTestCase,
    
  13.     TransactionTestCase,
    
  14.     skipIfDBFeature,
    
  15.     skipUnlessDBFeature,
    
  16. )
    
  17. 
    
  18. 
    
  19. class TestUtils(SimpleTestCase):
    
  20.     def test_truncate_name(self):
    
  21.         self.assertEqual(truncate_name("some_table", 10), "some_table")
    
  22.         self.assertEqual(truncate_name("some_long_table", 10), "some_la38a")
    
  23.         self.assertEqual(truncate_name("some_long_table", 10, 3), "some_loa38")
    
  24.         self.assertEqual(truncate_name("some_long_table"), "some_long_table")
    
  25.         # "user"."table" syntax
    
  26.         self.assertEqual(
    
  27.             truncate_name('username"."some_table', 10), 'username"."some_table'
    
  28.         )
    
  29.         self.assertEqual(
    
  30.             truncate_name('username"."some_long_table', 10), 'username"."some_la38a'
    
  31.         )
    
  32.         self.assertEqual(
    
  33.             truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38'
    
  34.         )
    
  35. 
    
  36.     def test_split_identifier(self):
    
  37.         self.assertEqual(split_identifier("some_table"), ("", "some_table"))
    
  38.         self.assertEqual(split_identifier('"some_table"'), ("", "some_table"))
    
  39.         self.assertEqual(
    
  40.             split_identifier('namespace"."some_table'), ("namespace", "some_table")
    
  41.         )
    
  42.         self.assertEqual(
    
  43.             split_identifier('"namespace"."some_table"'), ("namespace", "some_table")
    
  44.         )
    
  45. 
    
  46.     def test_format_number(self):
    
  47.         def equal(value, max_d, places, result):
    
  48.             self.assertEqual(format_number(Decimal(value), max_d, places), result)
    
  49. 
    
  50.         equal("0", 12, 3, "0.000")
    
  51.         equal("0", 12, 8, "0.00000000")
    
  52.         equal("1", 12, 9, "1.000000000")
    
  53.         equal("0.00000000", 12, 8, "0.00000000")
    
  54.         equal("0.000000004", 12, 8, "0.00000000")
    
  55.         equal("0.000000008", 12, 8, "0.00000001")
    
  56.         equal("0.000000000000000000999", 10, 8, "0.00000000")
    
  57.         equal("0.1234567890", 12, 10, "0.1234567890")
    
  58.         equal("0.1234567890", 12, 9, "0.123456789")
    
  59.         equal("0.1234567890", 12, 8, "0.12345679")
    
  60.         equal("0.1234567890", 12, 5, "0.12346")
    
  61.         equal("0.1234567890", 12, 3, "0.123")
    
  62.         equal("0.1234567890", 12, 1, "0.1")
    
  63.         equal("0.1234567890", 12, 0, "0")
    
  64.         equal("0.1234567890", None, 0, "0")
    
  65.         equal("1234567890.1234567890", None, 0, "1234567890")
    
  66.         equal("1234567890.1234567890", None, 2, "1234567890.12")
    
  67.         equal("0.1234", 5, None, "0.1234")
    
  68.         equal("123.12", 5, None, "123.12")
    
  69. 
    
  70.         with self.assertRaises(Rounded):
    
  71.             equal("0.1234567890", 5, None, "0.12346")
    
  72.         with self.assertRaises(Rounded):
    
  73.             equal("1234567890.1234", 5, None, "1234600000")
    
  74. 
    
  75.     def test_split_tzname_delta(self):
    
  76.         tests = [
    
  77.             ("Asia/Ust+Nera", ("Asia/Ust+Nera", None, None)),
    
  78.             ("Asia/Ust-Nera", ("Asia/Ust-Nera", None, None)),
    
  79.             ("Asia/Ust+Nera-02:00", ("Asia/Ust+Nera", "-", "02:00")),
    
  80.             ("Asia/Ust-Nera+05:00", ("Asia/Ust-Nera", "+", "05:00")),
    
  81.             ("America/Coral_Harbour-01:00", ("America/Coral_Harbour", "-", "01:00")),
    
  82.             ("America/Coral_Harbour+02:30", ("America/Coral_Harbour", "+", "02:30")),
    
  83.             ("UTC+15:00", ("UTC", "+", "15:00")),
    
  84.             ("UTC-04:43", ("UTC", "-", "04:43")),
    
  85.             ("UTC", ("UTC", None, None)),
    
  86.             ("UTC+1", ("UTC+1", None, None)),
    
  87.         ]
    
  88.         for tzname, expected in tests:
    
  89.             with self.subTest(tzname=tzname):
    
  90.                 self.assertEqual(split_tzname_delta(tzname), expected)
    
  91. 
    
  92. 
    
  93. class CursorWrapperTests(TransactionTestCase):
    
  94.     available_apps = []
    
  95. 
    
  96.     def _test_procedure(self, procedure_sql, params, param_types, kparams=None):
    
  97.         with connection.cursor() as cursor:
    
  98.             cursor.execute(procedure_sql)
    
  99.         # Use a new cursor because in MySQL a procedure can't be used in the
    
  100.         # same cursor in which it was created.
    
  101.         with connection.cursor() as cursor:
    
  102.             cursor.callproc("test_procedure", params, kparams)
    
  103.         with connection.schema_editor() as editor:
    
  104.             editor.remove_procedure("test_procedure", param_types)
    
  105. 
    
  106.     @skipUnlessDBFeature("create_test_procedure_without_params_sql")
    
  107.     def test_callproc_without_params(self):
    
  108.         self._test_procedure(
    
  109.             connection.features.create_test_procedure_without_params_sql, [], []
    
  110.         )
    
  111. 
    
  112.     @skipUnlessDBFeature("create_test_procedure_with_int_param_sql")
    
  113.     def test_callproc_with_int_params(self):
    
  114.         self._test_procedure(
    
  115.             connection.features.create_test_procedure_with_int_param_sql,
    
  116.             [1],
    
  117.             ["INTEGER"],
    
  118.         )
    
  119. 
    
  120.     @skipUnlessDBFeature(
    
  121.         "create_test_procedure_with_int_param_sql", "supports_callproc_kwargs"
    
  122.     )
    
  123.     def test_callproc_kparams(self):
    
  124.         self._test_procedure(
    
  125.             connection.features.create_test_procedure_with_int_param_sql,
    
  126.             [],
    
  127.             ["INTEGER"],
    
  128.             {"P_I": 1},
    
  129.         )
    
  130. 
    
  131.     @skipIfDBFeature("supports_callproc_kwargs")
    
  132.     def test_unsupported_callproc_kparams_raises_error(self):
    
  133.         msg = (
    
  134.             "Keyword parameters for callproc are not supported on this database "
    
  135.             "backend."
    
  136.         )
    
  137.         with self.assertRaisesMessage(NotSupportedError, msg):
    
  138.             with connection.cursor() as cursor:
    
  139.                 cursor.callproc("test_procedure", [], {"P_I": 1})