blob: bd170258f52c00e389741e676a7607702c1100df [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import shutil
import stat
import sys
import tempfile
import unittest
import uuid
import zipfile
import mock
from dependency_manager import dependency_manager_util
from dependency_manager import exceptions
class DependencyManagerUtilTest(unittest.TestCase):
# This class intentionally uses actual file I/O to test real system behavior.
def setUp(self):
self.tmp_dir = os.path.abspath(tempfile.mkdtemp(prefix='telemetry'))
self.sub_dir = os.path.join(self.tmp_dir, 'sub_dir')
os.mkdir(self.sub_dir)
self.read_only_path = (os.path.join(self.tmp_dir, 'read_only'))
with open(self.read_only_path, 'w+') as read_file:
read_file.write('Read-only file')
os.chmod(self.read_only_path, stat.S_IRUSR)
self.writable_path = (os.path.join(self.tmp_dir, 'writable'))
with open(self.writable_path, 'w+') as writable_file:
writable_file.write('Writable file')
os.chmod(self.writable_path, stat.S_IRUSR | stat.S_IWUSR)
self.executable_path = (os.path.join(self.tmp_dir, 'executable'))
with open(self.executable_path, 'w+') as executable_file:
executable_file.write('Executable file')
os.chmod(self.executable_path, stat.S_IRWXU)
self.sub_read_only_path = (os.path.join(self.sub_dir, 'read_only'))
with open(self.sub_read_only_path, 'w+') as read_file:
read_file.write('Read-only sub file')
os.chmod(self.sub_read_only_path, stat.S_IRUSR)
self.sub_writable_path = (os.path.join(self.sub_dir, 'writable'))
with open(self.sub_writable_path, 'w+') as writable_file:
writable_file.write('Writable sub file')
os.chmod(self.sub_writable_path, stat.S_IRUSR | stat.S_IWUSR)
self.sub_executable_path = (os.path.join(self.sub_dir, 'executable'))
with open(self.sub_executable_path, 'w+') as executable_file:
executable_file.write('Executable sub file')
os.chmod(self.sub_executable_path, stat.S_IRWXU)
self.AssertExpectedDirFiles(self.tmp_dir)
self.archive_path = self.CreateZipArchiveFromDir(self.tmp_dir)
def tearDown(self):
if os.path.isdir(self.tmp_dir):
dependency_manager_util.RemoveDir(self.tmp_dir)
if os.path.isfile(self.archive_path):
os.remove(self.archive_path)
def AssertExpectedDirFiles(self, top_dir):
sub_dir = os.path.join(top_dir, 'sub_dir')
read_only_path = (os.path.join(top_dir, 'read_only'))
writable_path = (os.path.join(top_dir, 'writable'))
executable_path = (os.path.join(top_dir, 'executable'))
sub_read_only_path = (os.path.join(sub_dir, 'read_only'))
sub_writable_path = (os.path.join(sub_dir, 'writable'))
sub_executable_path = (os.path.join(sub_dir, 'executable'))
# assert contents as expected
self.assertTrue(os.path.isdir(top_dir))
self.assertTrue(os.path.isdir(sub_dir))
self.assertTrue(os.path.isfile(read_only_path))
self.assertTrue(os.path.isfile(writable_path))
self.assertTrue(os.path.isfile(executable_path))
self.assertTrue(os.path.isfile(sub_read_only_path))
self.assertTrue(os.path.isfile(sub_writable_path))
self.assertTrue(os.path.isfile(sub_executable_path))
# assert permissions as expected
self.assertTrue(
stat.S_IRUSR & stat.S_IMODE(os.stat(read_only_path).st_mode))
self.assertTrue(
stat.S_IRUSR & stat.S_IMODE(os.stat(sub_read_only_path).st_mode))
self.assertTrue(
stat.S_IRUSR & stat.S_IMODE(os.stat(writable_path).st_mode))
self.assertTrue(
stat.S_IWUSR & stat.S_IMODE(os.stat(writable_path).st_mode))
self.assertTrue(
stat.S_IRUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode))
self.assertTrue(
stat.S_IWUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode))
if not sys.platform.startswith('win'):
self.assertEqual(
stat.S_IRWXU,
stat.S_IRWXU & stat.S_IMODE(os.stat(executable_path).st_mode))
self.assertEqual(
stat.S_IRWXU,
stat.S_IRWXU & stat.S_IMODE(os.stat(sub_executable_path).st_mode))
def CreateZipArchiveFromDir(self, dir_path):
try:
base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
archive_path = shutil.make_archive(base_path, 'zip', dir_path)
self.assertTrue(os.path.exists(archive_path))
self.assertTrue(zipfile.is_zipfile(archive_path))
except:
if os.path.isfile(archive_path):
os.remove(archive_path)
raise
return archive_path
def testRemoveDirWithSubDir(self):
dependency_manager_util.RemoveDir(self.tmp_dir)
self.assertFalse(os.path.exists(self.tmp_dir))
self.assertFalse(os.path.exists(self.sub_dir))
self.assertFalse(os.path.exists(self.read_only_path))
self.assertFalse(os.path.exists(self.writable_path))
self.assertFalse(os.path.isfile(self.executable_path))
self.assertFalse(os.path.exists(self.sub_read_only_path))
self.assertFalse(os.path.exists(self.sub_writable_path))
self.assertFalse(os.path.isfile(self.sub_executable_path))
def testUnzipFile(self):
self.AssertExpectedDirFiles(self.tmp_dir)
unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
dependency_manager_util.UnzipArchive(self.archive_path, unzip_path)
self.AssertExpectedDirFiles(unzip_path)
self.AssertExpectedDirFiles(self.tmp_dir)
dependency_manager_util.RemoveDir(unzip_path)
def testUnzipFileContainingLongPath(self):
try:
dir_path = self.tmp_dir
if sys.platform.startswith('win'):
dir_path = u'\\\\?\\' + dir_path
archive_suffix = ''
# 260 is the Windows API path length limit.
while len(archive_suffix) < 260:
archive_suffix = os.path.join(archive_suffix, 'really')
contents_dir_path = os.path.join(dir_path, archive_suffix)
os.makedirs(contents_dir_path)
filename = os.path.join(contents_dir_path, 'longpath.txt')
open(filename, 'a').close()
base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
archive_path = shutil.make_archive(base_path, 'zip', dir_path)
self.assertTrue(os.path.exists(archive_path))
self.assertTrue(zipfile.is_zipfile(archive_path))
except:
if os.path.isfile(archive_path):
os.remove(archive_path)
raise
unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
dependency_manager_util.UnzipArchive(archive_path, unzip_path)
dependency_manager_util.RemoveDir(unzip_path)
def testUnzipFileFailure(self):
# zipfile is not used on MacOS. See crbug.com/700097.
if sys.platform.startswith('darwin'):
return
unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
self.assertFalse(os.path.exists(unzip_path))
with mock.patch(
'dependency_manager.dependency_manager_util.zipfile.ZipFile.extractall' # pylint: disable=line-too-long
) as zipfile_mock:
zipfile_mock.side_effect = IOError
self.assertRaises(
IOError, dependency_manager_util.UnzipArchive, self.archive_path,
unzip_path)
self.AssertExpectedDirFiles(self.tmp_dir)
self.assertFalse(os.path.exists(unzip_path))
def testVerifySafeArchivePasses(self):
with zipfile.ZipFile(self.archive_path) as archive:
dependency_manager_util.VerifySafeArchive(archive)
def testVerifySafeArchiveFailsOnRelativePathWithPardir(self):
tmp_file = tempfile.NamedTemporaryFile(delete=False)
tmp_file_name = tmp_file.name
tmp_file.write('Bad file!')
tmp_file.close()
with zipfile.ZipFile(self.archive_path, 'w') as archive:
archive.write(tmp_file_name, '../../foo')
self.assertRaises(
exceptions.ArchiveError, dependency_manager_util.VerifySafeArchive,
archive)