| # -*- coding: utf-8 -*- |
| import errno |
| import os |
| import shutil |
| import sys |
| import tempfile |
| import time |
| from datetime import datetime, timedelta |
| try: |
| from cStringIO import StringIO |
| except ImportError: |
| from StringIO import StringIO |
| |
| try: |
| import threading |
| except ImportError: |
| import dummy_threading as threading |
| |
| from django.conf import settings |
| from django.core.exceptions import SuspiciousOperation, ImproperlyConfigured |
| from django.core.files.base import ContentFile |
| from django.core.files.images import get_image_dimensions |
| from django.core.files.storage import FileSystemStorage, get_storage_class |
| from django.core.files.uploadedfile import UploadedFile |
| from django.test import SimpleTestCase |
| from django.utils import unittest |
| |
| # Try to import PIL in either of the two ways it can end up installed. |
| # Checking for the existence of Image is enough for CPython, but |
| # for PyPy, you need to check for the underlying modules |
| try: |
| from PIL import Image, _imaging |
| except ImportError: |
| try: |
| import Image, _imaging |
| except ImportError: |
| Image = None |
| |
| |
| class GetStorageClassTests(SimpleTestCase): |
| |
| def test_get_filesystem_storage(self): |
| """ |
| get_storage_class returns the class for a storage backend name/path. |
| """ |
| self.assertEqual( |
| get_storage_class('django.core.files.storage.FileSystemStorage'), |
| FileSystemStorage) |
| |
| def test_get_invalid_storage_module(self): |
| """ |
| get_storage_class raises an error if the requested import don't exist. |
| """ |
| self.assertRaisesMessage( |
| ImproperlyConfigured, |
| "NonExistingStorage isn't a storage module.", |
| get_storage_class, |
| 'NonExistingStorage') |
| |
| def test_get_nonexisting_storage_class(self): |
| """ |
| get_storage_class raises an error if the requested class don't exist. |
| """ |
| self.assertRaisesMessage( |
| ImproperlyConfigured, |
| 'Storage module "django.core.files.storage" does not define a '\ |
| '"NonExistingStorage" class.', |
| get_storage_class, |
| 'django.core.files.storage.NonExistingStorage') |
| |
| def test_get_nonexisting_storage_module(self): |
| """ |
| get_storage_class raises an error if the requested module don't exist. |
| """ |
| # Error message may or may not be the fully qualified path. |
| self.assertRaisesRegexp( |
| ImproperlyConfigured, |
| ('Error importing storage module django.core.files.non_existing_' |
| 'storage: "No module named .*non_existing_storage"'), |
| get_storage_class, |
| 'django.core.files.non_existing_storage.NonExistingStorage' |
| ) |
| |
| class FileStorageTests(unittest.TestCase): |
| storage_class = FileSystemStorage |
| |
| def setUp(self): |
| self.temp_dir = tempfile.mkdtemp() |
| self.storage = self.storage_class(location=self.temp_dir, |
| base_url='/test_media_url/') |
| # Set up a second temporary directory which is ensured to have a mixed |
| # case name. |
| self.temp_dir2 = tempfile.mkdtemp(suffix='aBc') |
| |
| def tearDown(self): |
| shutil.rmtree(self.temp_dir) |
| shutil.rmtree(self.temp_dir2) |
| |
| def test_emtpy_location(self): |
| """ |
| Makes sure an exception is raised if the location is empty |
| """ |
| storage = self.storage_class(location='') |
| self.assertEqual(storage.base_location, '') |
| self.assertEqual(storage.location, os.getcwd()) |
| |
| def test_file_access_options(self): |
| """ |
| Standard file access options are available, and work as expected. |
| """ |
| self.assertFalse(self.storage.exists('storage_test')) |
| f = self.storage.open('storage_test', 'w') |
| f.write('storage contents') |
| f.close() |
| self.assertTrue(self.storage.exists('storage_test')) |
| |
| f = self.storage.open('storage_test', 'r') |
| self.assertEqual(f.read(), 'storage contents') |
| f.close() |
| |
| self.storage.delete('storage_test') |
| self.assertFalse(self.storage.exists('storage_test')) |
| |
| def test_file_accessed_time(self): |
| """ |
| File storage returns a Datetime object for the last accessed time of |
| a file. |
| """ |
| self.assertFalse(self.storage.exists('test.file')) |
| |
| f = ContentFile('custom contents') |
| f_name = self.storage.save('test.file', f) |
| atime = self.storage.accessed_time(f_name) |
| |
| self.assertEqual(atime, datetime.fromtimestamp( |
| os.path.getatime(self.storage.path(f_name)))) |
| self.assertTrue(datetime.now() - self.storage.accessed_time(f_name) < timedelta(seconds=2)) |
| self.storage.delete(f_name) |
| |
| def test_file_created_time(self): |
| """ |
| File storage returns a Datetime object for the creation time of |
| a file. |
| """ |
| self.assertFalse(self.storage.exists('test.file')) |
| |
| f = ContentFile('custom contents') |
| f_name = self.storage.save('test.file', f) |
| ctime = self.storage.created_time(f_name) |
| |
| self.assertEqual(ctime, datetime.fromtimestamp( |
| os.path.getctime(self.storage.path(f_name)))) |
| self.assertTrue(datetime.now() - self.storage.created_time(f_name) < timedelta(seconds=2)) |
| |
| self.storage.delete(f_name) |
| |
| def test_file_modified_time(self): |
| """ |
| File storage returns a Datetime object for the last modified time of |
| a file. |
| """ |
| self.assertFalse(self.storage.exists('test.file')) |
| |
| f = ContentFile('custom contents') |
| f_name = self.storage.save('test.file', f) |
| mtime = self.storage.modified_time(f_name) |
| |
| self.assertEqual(mtime, datetime.fromtimestamp( |
| os.path.getmtime(self.storage.path(f_name)))) |
| self.assertTrue(datetime.now() - self.storage.modified_time(f_name) < timedelta(seconds=2)) |
| |
| self.storage.delete(f_name) |
| |
| def test_file_save_without_name(self): |
| """ |
| File storage extracts the filename from the content object if no |
| name is given explicitly. |
| """ |
| self.assertFalse(self.storage.exists('test.file')) |
| |
| f = ContentFile('custom contents') |
| f.name = 'test.file' |
| |
| storage_f_name = self.storage.save(None, f) |
| |
| self.assertEqual(storage_f_name, f.name) |
| |
| self.assertTrue(os.path.exists(os.path.join(self.temp_dir, f.name))) |
| |
| self.storage.delete(storage_f_name) |
| |
| def test_file_save_with_path(self): |
| """ |
| Saving a pathname should create intermediate directories as necessary. |
| """ |
| self.assertFalse(self.storage.exists('path/to')) |
| self.storage.save('path/to/test.file', |
| ContentFile('file saved with path')) |
| |
| self.assertTrue(self.storage.exists('path/to')) |
| self.assertEqual(self.storage.open('path/to/test.file').read(), |
| 'file saved with path') |
| |
| self.assertTrue(os.path.exists( |
| os.path.join(self.temp_dir, 'path', 'to', 'test.file'))) |
| |
| self.storage.delete('path/to/test.file') |
| |
| def test_file_path(self): |
| """ |
| File storage returns the full path of a file |
| """ |
| self.assertFalse(self.storage.exists('test.file')) |
| |
| f = ContentFile('custom contents') |
| f_name = self.storage.save('test.file', f) |
| |
| self.assertEqual(self.storage.path(f_name), |
| os.path.join(self.temp_dir, f_name)) |
| |
| self.storage.delete(f_name) |
| |
| def test_file_url(self): |
| """ |
| File storage returns a url to access a given file from the Web. |
| """ |
| self.assertEqual(self.storage.url('test.file'), |
| '%s%s' % (self.storage.base_url, 'test.file')) |
| |
| # should encode special chars except ~!*()' |
| # like encodeURIComponent() JavaScript function do |
| self.assertEqual(self.storage.url(r"""~!*()'@#$%^&*abc`+ =.file"""), |
| """/test_media_url/~!*()'%40%23%24%25%5E%26*abc%60%2B%20%3D.file""") |
| |
| # should stanslate os path separator(s) to the url path separator |
| self.assertEqual(self.storage.url("""a/b\\c.file"""), |
| """/test_media_url/a/b/c.file""") |
| |
| self.storage.base_url = None |
| self.assertRaises(ValueError, self.storage.url, 'test.file') |
| |
| def test_listdir(self): |
| """ |
| File storage returns a tuple containing directories and files. |
| """ |
| self.assertFalse(self.storage.exists('storage_test_1')) |
| self.assertFalse(self.storage.exists('storage_test_2')) |
| self.assertFalse(self.storage.exists('storage_dir_1')) |
| |
| f = self.storage.save('storage_test_1', ContentFile('custom content')) |
| f = self.storage.save('storage_test_2', ContentFile('custom content')) |
| os.mkdir(os.path.join(self.temp_dir, 'storage_dir_1')) |
| |
| dirs, files = self.storage.listdir('') |
| self.assertEqual(set(dirs), set([u'storage_dir_1'])) |
| self.assertEqual(set(files), |
| set([u'storage_test_1', u'storage_test_2'])) |
| |
| self.storage.delete('storage_test_1') |
| self.storage.delete('storage_test_2') |
| os.rmdir(os.path.join(self.temp_dir, 'storage_dir_1')) |
| |
| def test_file_storage_prevents_directory_traversal(self): |
| """ |
| File storage prevents directory traversal (files can only be accessed if |
| they're below the storage location). |
| """ |
| self.assertRaises(SuspiciousOperation, self.storage.exists, '..') |
| self.assertRaises(SuspiciousOperation, self.storage.exists, '/etc/passwd') |
| |
| def test_file_storage_preserves_filename_case(self): |
| """The storage backend should preserve case of filenames.""" |
| # Create a storage backend associated with the mixed case name |
| # directory. |
| temp_storage = self.storage_class(location=self.temp_dir2) |
| # Ask that storage backend to store a file with a mixed case filename. |
| mixed_case = 'CaSe_SeNsItIvE' |
| file = temp_storage.open(mixed_case, 'w') |
| file.write('storage contents') |
| file.close() |
| self.assertEqual(os.path.join(self.temp_dir2, mixed_case), |
| temp_storage.path(mixed_case)) |
| temp_storage.delete(mixed_case) |
| |
| def test_makedirs_race_handling(self): |
| """ |
| File storage should be robust against directory creation race conditions. |
| """ |
| real_makedirs = os.makedirs |
| |
| # Monkey-patch os.makedirs, to simulate a normal call, a raced call, |
| # and an error. |
| def fake_makedirs(path): |
| if path == os.path.join(self.temp_dir, 'normal'): |
| real_makedirs(path) |
| elif path == os.path.join(self.temp_dir, 'raced'): |
| real_makedirs(path) |
| raise OSError(errno.EEXIST, 'simulated EEXIST') |
| elif path == os.path.join(self.temp_dir, 'error'): |
| raise OSError(errno.EACCES, 'simulated EACCES') |
| else: |
| self.fail('unexpected argument %r' % path) |
| |
| try: |
| os.makedirs = fake_makedirs |
| |
| self.storage.save('normal/test.file', |
| ContentFile('saved normally')) |
| self.assertEqual(self.storage.open('normal/test.file').read(), |
| 'saved normally') |
| |
| self.storage.save('raced/test.file', |
| ContentFile('saved with race')) |
| self.assertEqual(self.storage.open('raced/test.file').read(), |
| 'saved with race') |
| |
| # Check that OSErrors aside from EEXIST are still raised. |
| self.assertRaises(OSError, |
| self.storage.save, 'error/test.file', ContentFile('not saved')) |
| finally: |
| os.makedirs = real_makedirs |
| |
| def test_remove_race_handling(self): |
| """ |
| File storage should be robust against file removal race conditions. |
| """ |
| real_remove = os.remove |
| |
| # Monkey-patch os.remove, to simulate a normal call, a raced call, |
| # and an error. |
| def fake_remove(path): |
| if path == os.path.join(self.temp_dir, 'normal.file'): |
| real_remove(path) |
| elif path == os.path.join(self.temp_dir, 'raced.file'): |
| real_remove(path) |
| raise OSError(errno.ENOENT, 'simulated ENOENT') |
| elif path == os.path.join(self.temp_dir, 'error.file'): |
| raise OSError(errno.EACCES, 'simulated EACCES') |
| else: |
| self.fail('unexpected argument %r' % path) |
| |
| try: |
| os.remove = fake_remove |
| |
| self.storage.save('normal.file', ContentFile('delete normally')) |
| self.storage.delete('normal.file') |
| self.assertFalse(self.storage.exists('normal.file')) |
| |
| self.storage.save('raced.file', ContentFile('delete with race')) |
| self.storage.delete('raced.file') |
| self.assertFalse(self.storage.exists('normal.file')) |
| |
| # Check that OSErrors aside from ENOENT are still raised. |
| self.storage.save('error.file', ContentFile('delete with error')) |
| self.assertRaises(OSError, self.storage.delete, 'error.file') |
| finally: |
| os.remove = real_remove |
| |
| |
| class CustomStorage(FileSystemStorage): |
| def get_available_name(self, name): |
| """ |
| Append numbers to duplicate files rather than underscores, like Trac. |
| """ |
| parts = name.split('.') |
| basename, ext = parts[0], parts[1:] |
| number = 2 |
| while self.exists(name): |
| name = '.'.join([basename, str(number)] + ext) |
| number += 1 |
| |
| return name |
| |
| class CustomStorageTests(FileStorageTests): |
| storage_class = CustomStorage |
| |
| def test_custom_get_available_name(self): |
| first = self.storage.save('custom_storage', ContentFile('custom contents')) |
| self.assertEqual(first, 'custom_storage') |
| second = self.storage.save('custom_storage', ContentFile('more contents')) |
| self.assertEqual(second, 'custom_storage.2') |
| self.storage.delete(first) |
| self.storage.delete(second) |
| |
| class UnicodeFileNameTests(unittest.TestCase): |
| def test_unicode_file_names(self): |
| """ |
| Regression test for #8156: files with unicode names I can't quite figure |
| out the encoding situation between doctest and this file, but the actual |
| repr doesn't matter; it just shouldn't return a unicode object. |
| """ |
| uf = UploadedFile(name=u'¿Cómo?',content_type='text') |
| self.assertEqual(type(uf.__repr__()), str) |
| |
| # Tests for a race condition on file saving (#4948). |
| # This is written in such a way that it'll always pass on platforms |
| # without threading. |
| |
| class SlowFile(ContentFile): |
| def chunks(self): |
| time.sleep(1) |
| return super(ContentFile, self).chunks() |
| |
| class FileSaveRaceConditionTest(unittest.TestCase): |
| def setUp(self): |
| self.storage_dir = tempfile.mkdtemp() |
| self.storage = FileSystemStorage(self.storage_dir) |
| self.thread = threading.Thread(target=self.save_file, args=['conflict']) |
| |
| def tearDown(self): |
| shutil.rmtree(self.storage_dir) |
| |
| def save_file(self, name): |
| name = self.storage.save(name, SlowFile("Data")) |
| |
| def test_race_condition(self): |
| self.thread.start() |
| name = self.save_file('conflict') |
| self.thread.join() |
| self.assertTrue(self.storage.exists('conflict')) |
| self.assertTrue(self.storage.exists('conflict_1')) |
| self.storage.delete('conflict') |
| self.storage.delete('conflict_1') |
| |
| class FileStoragePermissions(unittest.TestCase): |
| def setUp(self): |
| self.old_perms = settings.FILE_UPLOAD_PERMISSIONS |
| settings.FILE_UPLOAD_PERMISSIONS = 0666 |
| self.storage_dir = tempfile.mkdtemp() |
| self.storage = FileSystemStorage(self.storage_dir) |
| |
| def tearDown(self): |
| settings.FILE_UPLOAD_PERMISSIONS = self.old_perms |
| shutil.rmtree(self.storage_dir) |
| |
| def test_file_upload_permissions(self): |
| name = self.storage.save("the_file", ContentFile("data")) |
| actual_mode = os.stat(self.storage.path(name))[0] & 0777 |
| self.assertEqual(actual_mode, 0666) |
| |
| |
| class FileStoragePathParsing(unittest.TestCase): |
| def setUp(self): |
| self.storage_dir = tempfile.mkdtemp() |
| self.storage = FileSystemStorage(self.storage_dir) |
| |
| def tearDown(self): |
| shutil.rmtree(self.storage_dir) |
| |
| def test_directory_with_dot(self): |
| """Regression test for #9610. |
| |
| If the directory name contains a dot and the file name doesn't, make |
| sure we still mangle the file name instead of the directory name. |
| """ |
| |
| self.storage.save('dotted.path/test', ContentFile("1")) |
| self.storage.save('dotted.path/test', ContentFile("2")) |
| |
| self.assertFalse(os.path.exists(os.path.join(self.storage_dir, 'dotted_.path'))) |
| self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/test'))) |
| self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/test_1'))) |
| |
| def test_first_character_dot(self): |
| """ |
| File names with a dot as their first character don't have an extension, |
| and the underscore should get added to the end. |
| """ |
| self.storage.save('dotted.path/.test', ContentFile("1")) |
| self.storage.save('dotted.path/.test', ContentFile("2")) |
| |
| self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/.test'))) |
| # Before 2.6, a leading dot was treated as an extension, and so |
| # underscore gets added to beginning instead of end. |
| if sys.version_info < (2, 6): |
| self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/_1.test'))) |
| else: |
| self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/.test_1'))) |
| |
| class DimensionClosingBug(unittest.TestCase): |
| """ |
| Test that get_image_dimensions() properly closes files (#8817) |
| """ |
| @unittest.skipUnless(Image, "PIL not installed") |
| def test_not_closing_of_files(self): |
| """ |
| Open files passed into get_image_dimensions() should stay opened. |
| """ |
| empty_io = StringIO() |
| try: |
| get_image_dimensions(empty_io) |
| finally: |
| self.assertTrue(not empty_io.closed) |
| |
| @unittest.skipUnless(Image, "PIL not installed") |
| def test_closing_of_filenames(self): |
| """ |
| get_image_dimensions() called with a filename should closed the file. |
| """ |
| # We need to inject a modified open() builtin into the images module |
| # that checks if the file was closed properly if the function is |
| # called with a filename instead of an file object. |
| # get_image_dimensions will call our catching_open instead of the |
| # regular builtin one. |
| |
| class FileWrapper(object): |
| _closed = [] |
| def __init__(self, f): |
| self.f = f |
| def __getattr__(self, name): |
| return getattr(self.f, name) |
| def close(self): |
| self._closed.append(True) |
| self.f.close() |
| |
| def catching_open(*args): |
| return FileWrapper(open(*args)) |
| |
| from django.core.files import images |
| images.open = catching_open |
| try: |
| get_image_dimensions(os.path.join(os.path.dirname(__file__), "test1.png")) |
| finally: |
| del images.open |
| self.assertTrue(FileWrapper._closed) |
| |
| class InconsistentGetImageDimensionsBug(unittest.TestCase): |
| """ |
| Test that get_image_dimensions() works properly after various calls |
| using a file handler (#11158) |
| """ |
| @unittest.skipUnless(Image, "PIL not installed") |
| def test_multiple_calls(self): |
| """ |
| Multiple calls of get_image_dimensions() should return the same size. |
| """ |
| from django.core.files.images import ImageFile |
| |
| img_path = os.path.join(os.path.dirname(__file__), "test.png") |
| image = ImageFile(open(img_path, 'rb')) |
| image_pil = Image.open(img_path) |
| size_1, size_2 = get_image_dimensions(image), get_image_dimensions(image) |
| self.assertEqual(image_pil.size, size_1) |
| self.assertEqual(size_1, size_2) |
| |
| class ContentFileTestCase(unittest.TestCase): |
| """ |
| Test that the constructor of ContentFile accepts 'name' (#16590). |
| """ |
| def test_content_file_default_name(self): |
| self.assertEqual(ContentFile("content").name, None) |
| |
| def test_content_file_custome_name(self): |
| name = "I can have a name too!" |
| self.assertEqual(ContentFile("content", name=name).name, name) |