1. """
    
  2. Upload handlers to test the upload API.
    
  3. """
    
  4. import os
    
  5. from tempfile import NamedTemporaryFile
    
  6. 
    
  7. from django.core.files.uploadhandler import (
    
  8.     FileUploadHandler,
    
  9.     StopUpload,
    
  10.     TemporaryFileUploadHandler,
    
  11. )
    
  12. 
    
  13. 
    
  14. class QuotaUploadHandler(FileUploadHandler):
    
  15.     """
    
  16.     This test upload handler terminates the connection if more than a quota
    
  17.     (5MB) is uploaded.
    
  18.     """
    
  19. 
    
  20.     QUOTA = 5 * 2**20  # 5 MB
    
  21. 
    
  22.     def __init__(self, request=None):
    
  23.         super().__init__(request)
    
  24.         self.total_upload = 0
    
  25. 
    
  26.     def receive_data_chunk(self, raw_data, start):
    
  27.         self.total_upload += len(raw_data)
    
  28.         if self.total_upload >= self.QUOTA:
    
  29.             raise StopUpload(connection_reset=True)
    
  30.         return raw_data
    
  31. 
    
  32.     def file_complete(self, file_size):
    
  33.         return None
    
  34. 
    
  35. 
    
  36. class StopUploadTemporaryFileHandler(TemporaryFileUploadHandler):
    
  37.     """A handler that raises a StopUpload exception."""
    
  38. 
    
  39.     def receive_data_chunk(self, raw_data, start):
    
  40.         raise StopUpload()
    
  41. 
    
  42. 
    
  43. class CustomUploadError(Exception):
    
  44.     pass
    
  45. 
    
  46. 
    
  47. class ErroringUploadHandler(FileUploadHandler):
    
  48.     """A handler that raises an exception."""
    
  49. 
    
  50.     def receive_data_chunk(self, raw_data, start):
    
  51.         raise CustomUploadError("Oops!")
    
  52. 
    
  53. 
    
  54. class TraversalUploadHandler(FileUploadHandler):
    
  55.     """A handler with potential directory-traversal vulnerability."""
    
  56. 
    
  57.     def __init__(self, request=None):
    
  58.         from .views import UPLOAD_TO
    
  59. 
    
  60.         super().__init__(request)
    
  61.         self.upload_dir = UPLOAD_TO
    
  62. 
    
  63.     def file_complete(self, file_size):
    
  64.         self.file.seek(0)
    
  65.         self.file.size = file_size
    
  66.         with open(os.path.join(self.upload_dir, self.file_name), "wb") as fp:
    
  67.             fp.write(self.file.read())
    
  68.         return self.file
    
  69. 
    
  70.     def new_file(
    
  71.         self,
    
  72.         field_name,
    
  73.         file_name,
    
  74.         content_type,
    
  75.         content_length,
    
  76.         charset=None,
    
  77.         content_type_extra=None,
    
  78.     ):
    
  79.         super().new_file(
    
  80.             file_name,
    
  81.             file_name,
    
  82.             content_length,
    
  83.             content_length,
    
  84.             charset,
    
  85.             content_type_extra,
    
  86.         )
    
  87.         self.file = NamedTemporaryFile(suffix=".upload", dir=self.upload_dir)
    
  88. 
    
  89.     def receive_data_chunk(self, raw_data, start):
    
  90.         self.file.write(raw_data)