| #!/usr/bin/env python |
| # test_ftpd.py |
| |
| # ====================================================================== |
| # Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com> |
| # |
| # All Rights Reserved |
| # |
| # Permission to use, copy, modify, and distribute this software and |
| # its documentation for any purpose and without fee is hereby |
| # granted, provided that the above copyright notice appear in all |
| # copies and that both that copyright notice and this permission |
| # notice appear in supporting documentation, and that the name of |
| # Giampaolo Rodola' not be used in advertising or publicity pertaining to |
| # distribution of the software without specific, written prior |
| # permission. |
| # |
| # Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, |
| # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN |
| # NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR |
| # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS |
| # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, |
| # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN |
| # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| # ====================================================================== |
| |
| |
| # This test suite has been run successfully on the following systems: |
| # |
| # ----------------------------------------------------------- |
| # System | Python version |
| # ----------------------------------------------------------- |
| # Linux Ubuntu 2.6.20-15 | 2.4, 2.5 |
| # Linux Kubuntu 8.04 32 & 64 bits | 2.5.2 |
| # Linux Debian 2.4.27-2-386 | 2.3.5 |
| # Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2 |
| # Windows Vista Ultimate 64 bit | 2.5.1 |
| # Windows Vista Business 32 bit | 2.5.1 |
| # Windows Server 2008 64bit | 2.5.1 |
| # Windows Mobile 6.1 | PythonCE 2.5 |
| # OS X 10.4.10 | 2.3, 2.4, 2.5 |
| # FreeBSD 7.0 | 2.4, 2.5 |
| # ----------------------------------------------------------- |
| |
| |
| import threading |
| import unittest |
| import socket |
| import os |
| import time |
| import re |
| import tempfile |
| import ftplib |
| import random |
| import warnings |
| import sys |
| |
| from pyftpdlib import ftpserver |
| |
| |
| __release__ = 'pyftpdlib 0.5.0' |
| |
| # Attempt to use IP rather than hostname (test suite will run a lot faster) |
| try: |
| HOST = socket.gethostbyname('localhost') |
| except socket.error: |
| HOST = 'localhost' |
| USER = 'user' |
| PASSWD = '12345' |
| HOME = os.getcwd() |
| try: |
| from test.test_support import TESTFN |
| except ImportError: |
| TESTFN = 'temp-fname' |
| TESTFN2 = TESTFN + '2' |
| TESTFN3 = TESTFN + '3' |
| |
| def try_address(host, port=0): |
| """Try to bind a daemon on the given host:port and return True |
| if that has been possible.""" |
| try: |
| ftpserver.FTPServer((host, port), None) |
| except socket.error: |
| return False |
| else: |
| return True |
| |
| SUPPORTS_IPV4 = try_address('127.0.0.1') |
| SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1') |
| |
| |
| class TestAbstractedFS(unittest.TestCase): |
| """Test for conversion utility methods of AbstractedFS class.""" |
| |
| def test_ftpnorm(self): |
| # Tests for ftpnorm method. |
| ae = self.assertEquals |
| fs = ftpserver.AbstractedFS() |
| |
| fs.cwd = '/' |
| ae(fs.ftpnorm(''), '/') |
| ae(fs.ftpnorm('/'), '/') |
| ae(fs.ftpnorm('.'), '/') |
| ae(fs.ftpnorm('..'), '/') |
| ae(fs.ftpnorm('a'), '/a') |
| ae(fs.ftpnorm('/a'), '/a') |
| ae(fs.ftpnorm('/a/'), '/a') |
| ae(fs.ftpnorm('a/..'), '/') |
| ae(fs.ftpnorm('a/b'), '/a/b') |
| ae(fs.ftpnorm('a/b/..'), '/a') |
| ae(fs.ftpnorm('a/b/../..'), '/') |
| fs.cwd = '/sub' |
| ae(fs.ftpnorm(''), '/sub') |
| ae(fs.ftpnorm('/'), '/') |
| ae(fs.ftpnorm('.'), '/sub') |
| ae(fs.ftpnorm('..'), '/') |
| ae(fs.ftpnorm('a'), '/sub/a') |
| ae(fs.ftpnorm('a/'), '/sub/a') |
| ae(fs.ftpnorm('a/..'), '/sub') |
| ae(fs.ftpnorm('a/b'), '/sub/a/b') |
| ae(fs.ftpnorm('a/b/'), '/sub/a/b') |
| ae(fs.ftpnorm('a/b/..'), '/sub/a') |
| ae(fs.ftpnorm('a/b/../..'), '/sub') |
| ae(fs.ftpnorm('a/b/../../..'), '/') |
| ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed |
| |
| def test_ftp2fs(self): |
| # Tests for ftp2fs method. |
| ae = self.assertEquals |
| fs = ftpserver.AbstractedFS() |
| join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) |
| |
| def goforit(root): |
| fs.root = root |
| fs.cwd = '/' |
| ae(fs.ftp2fs(''), root) |
| ae(fs.ftp2fs('/'), root) |
| ae(fs.ftp2fs('.'), root) |
| ae(fs.ftp2fs('..'), root) |
| ae(fs.ftp2fs('a'), join(root, 'a')) |
| ae(fs.ftp2fs('/a'), join(root, 'a')) |
| ae(fs.ftp2fs('/a/'), join(root, 'a')) |
| ae(fs.ftp2fs('a/..'), root) |
| ae(fs.ftp2fs('a/b'), join(root, r'a/b')) |
| ae(fs.ftp2fs('/a/b'), join(root, r'a/b')) |
| ae(fs.ftp2fs('/a/b/..'), join(root, 'a')) |
| ae(fs.ftp2fs('/a/b/../..'), root) |
| fs.cwd = '/sub' |
| ae(fs.ftp2fs(''), join(root, 'sub')) |
| ae(fs.ftp2fs('/'), root) |
| ae(fs.ftp2fs('.'), join(root, 'sub')) |
| ae(fs.ftp2fs('..'), root) |
| ae(fs.ftp2fs('a'), join(root, 'sub/a')) |
| ae(fs.ftp2fs('a/'), join(root, 'sub/a')) |
| ae(fs.ftp2fs('a/..'), join(root, 'sub')) |
| ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b')) |
| ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a')) |
| ae(fs.ftp2fs('a/b/../..'), join(root, 'sub')) |
| ae(fs.ftp2fs('a/b/../../..'), root) |
| ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed |
| |
| if os.sep == '\\': |
| goforit(r'C:\dir') |
| goforit('C:\\') |
| # on DOS-derived filesystems (e.g. Windows) this is the same |
| # as specifying the current drive directory (e.g. 'C:\\') |
| goforit('\\') |
| elif os.sep == '/': |
| goforit('/home/user') |
| goforit('/') |
| else: |
| # os.sep == ':'? Don't know... let's try it anyway |
| goforit(os.getcwd()) |
| |
| def test_fs2ftp(self): |
| # Tests for fs2ftp method. |
| ae = self.assertEquals |
| fs = ftpserver.AbstractedFS() |
| join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) |
| |
| def goforit(root): |
| fs.root = root |
| ae(fs.fs2ftp(root), '/') |
| ae(fs.fs2ftp(join(root, '/')), '/') |
| ae(fs.fs2ftp(join(root, '.')), '/') |
| ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root |
| ae(fs.fs2ftp(join(root, 'a')), '/a') |
| ae(fs.fs2ftp(join(root, 'a/')), '/a') |
| ae(fs.fs2ftp(join(root, 'a/..')), '/') |
| ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') |
| ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') |
| ae(fs.fs2ftp(join(root, 'a/b/..')), '/a') |
| ae(fs.fs2ftp(join(root, '/a/b/../..')), '/') |
| fs.cwd = '/sub' |
| ae(fs.fs2ftp(join(root, 'a/')), '/a') |
| |
| if os.sep == '\\': |
| goforit(r'C:\dir') |
| goforit('C:\\') |
| # on DOS-derived filesystems (e.g. Windows) this is the same |
| # as specifying the current drive directory (e.g. 'C:\\') |
| goforit('\\') |
| fs.root = r'C:\dir' |
| ae(fs.fs2ftp('C:\\'), '/') |
| ae(fs.fs2ftp('D:\\'), '/') |
| ae(fs.fs2ftp('D:\\dir'), '/') |
| elif os.sep == '/': |
| goforit('/') |
| assert os.path.realpath('/__home/user') == '/__home/user', \ |
| 'Test skipped (symlinks not allowed).' |
| goforit('/__home/user') |
| fs.root = '/__home/user' |
| ae(fs.fs2ftp('/__home'), '/') |
| ae(fs.fs2ftp('/'), '/') |
| ae(fs.fs2ftp('/__home/userx'), '/') |
| else: |
| # os.sep == ':'? Don't know... let's try it anyway |
| goforit(os.getcwd()) |
| |
| def test_validpath(self): |
| # Tests for validpath method. |
| fs = ftpserver.AbstractedFS() |
| fs.root = HOME |
| self.failUnless(fs.validpath(HOME)) |
| self.failUnless(fs.validpath(HOME + '/')) |
| self.failIf(fs.validpath(HOME + 'xxx')) |
| |
| if hasattr(os, 'symlink'): |
| # Tests for validpath on systems supporting symbolic links. |
| |
| def _safe_remove(self, path): |
| # convenience function for removing temporary files |
| try: |
| os.remove(path) |
| except os.error: |
| pass |
| |
| def test_validpath_validlink(self): |
| # Test validpath by issuing a symlink pointing to a path |
| # inside the root directory. |
| fs = ftpserver.AbstractedFS() |
| fs.root = HOME |
| try: |
| open(TESTFN, 'w') |
| os.symlink(TESTFN, TESTFN2) |
| self.failUnless(fs.validpath(TESTFN)) |
| finally: |
| self._safe_remove(TESTFN) |
| self._safe_remove(TESTFN2) |
| |
| def test_validpath_external_symlink(self): |
| # Test validpath by issuing a symlink pointing to a path |
| # outside the root directory. |
| fs = ftpserver.AbstractedFS() |
| fs.root = HOME |
| try: |
| # tempfile should create our file in /tmp directory |
| # which should be outside the user root. If it is not |
| # we just skip the test. |
| file = tempfile.NamedTemporaryFile() |
| if HOME == os.path.dirname(file.name): |
| return |
| os.symlink(file.name, TESTFN) |
| self.failIf(fs.validpath(TESTFN)) |
| finally: |
| self._safe_remove(TESTFN) |
| file.close() |
| |
| |
| class TestDummyAuthorizer(unittest.TestCase): |
| """Tests for DummyAuthorizer class.""" |
| |
| # temporarily change warnings to exceptions for the purposes of testing |
| def setUp(self): |
| self.tempdir = tempfile.mkdtemp(dir=HOME) |
| self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir)) |
| self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name |
| self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name |
| warnings.filterwarnings("error") |
| |
| def tearDown(self): |
| os.remove(self.tempfile) |
| os.remove(self.subtempfile) |
| os.rmdir(self.subtempdir) |
| os.rmdir(self.tempdir) |
| warnings.resetwarnings() |
| |
| def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs): |
| try: |
| callableObj(*args, **kwargs) |
| except excClass, why: |
| if str(why) == msg: |
| return |
| raise self.failureException("%s != %s" %(str(why), msg)) |
| else: |
| if hasattr(excClass,'__name__'): excName = excClass.__name__ |
| else: excName = str(excClass) |
| raise self.failureException, "%s not raised" % excName |
| |
| def test_common_methods(self): |
| auth = ftpserver.DummyAuthorizer() |
| # create user |
| auth.add_user(USER, PASSWD, HOME) |
| auth.add_anonymous(HOME) |
| # check credentials |
| self.failUnless(auth.validate_authentication(USER, PASSWD)) |
| self.failIf(auth.validate_authentication(USER, 'wrongpwd')) |
| # remove them |
| auth.remove_user(USER) |
| auth.remove_user('anonymous') |
| # raise exc if user does not exists |
| self.assertRaises(KeyError, auth.remove_user, USER) |
| # raise exc if path does not exist |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'No such directory: "%s"' %'?:\\', |
| auth.add_user, USER, PASSWD, '?:\\') |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'No such directory: "%s"' %'?:\\', |
| auth.add_anonymous, '?:\\') |
| # raise exc if user already exists |
| auth.add_user(USER, PASSWD, HOME) |
| auth.add_anonymous(HOME) |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'User "%s" already exists' %USER, |
| auth.add_user, USER, PASSWD, HOME) |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'User "anonymous" already exists', |
| auth.add_anonymous, HOME) |
| auth.remove_user(USER) |
| auth.remove_user('anonymous') |
| # raise on wrong permission |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'No such permission "?"', |
| auth.add_user, USER, PASSWD, HOME, perm='?') |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'No such permission "?"', |
| auth.add_anonymous, HOME, perm='?') |
| # expect warning on write permissions assigned to anonymous user |
| for x in "adfmw": |
| self.assertRaisesWithMsg(RuntimeWarning, |
| "Write permissions assigned to anonymous user.", |
| auth.add_anonymous, HOME, perm=x) |
| |
| def test_override_perm_interface(self): |
| auth = ftpserver.DummyAuthorizer() |
| auth.add_user(USER, PASSWD, HOME, perm='elr') |
| # raise exc if user does not exists |
| self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr') |
| # raise exc if path does not exist or it's not a directory |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'No such directory: "%s"' %'?:\\', |
| auth.override_perm, USER, '?:\\', 'elr') |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'No such directory: "%s"' %self.tempfile, |
| auth.override_perm, USER, self.tempfile, 'elr') |
| # raise on wrong permission |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| 'No such permission "?"', auth.override_perm, |
| USER, HOME, perm='?') |
| # expect warning on write permissions assigned to anonymous user |
| auth.add_anonymous(HOME) |
| for p in "adfmw": |
| self.assertRaisesWithMsg(RuntimeWarning, |
| "Write permissions assigned to anonymous user.", |
| auth.override_perm, 'anonymous', HOME, p) |
| # raise on attempt to override home directory permissions |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| "Can't override home directory permissions", |
| auth.override_perm, USER, HOME, perm='w') |
| # raise on attempt to override a path escaping home directory |
| if os.path.dirname(HOME) != HOME: |
| self.assertRaisesWithMsg(ftpserver.AuthorizerError, |
| "Path escapes user home directory", |
| auth.override_perm, USER, |
| os.path.dirname(HOME), perm='w') |
| # try to re-set an overridden permission |
| auth.override_perm(USER, self.tempdir, perm='w') |
| auth.override_perm(USER, self.tempdir, perm='wr') |
| |
| def test_override_perm_recursive_paths(self): |
| auth = ftpserver.DummyAuthorizer() |
| auth.add_user(USER, PASSWD, HOME, perm='elr') |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) |
| auth.override_perm(USER, self.tempdir, perm='w', recursive=True) |
| self.assert_(auth.has_perm(USER, 'w', HOME) is False) |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) |
| self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) |
| self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True) |
| self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True) |
| |
| self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) |
| path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) |
| self.assert_(auth.has_perm(USER, 'w', path) is False) |
| # test case-sensitiveness |
| if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) |
| |
| def test_override_perm_not_recursive_paths(self): |
| auth = ftpserver.DummyAuthorizer() |
| auth.add_user(USER, PASSWD, HOME, perm='elr') |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) |
| auth.override_perm(USER, self.tempdir, perm='w') |
| self.assert_(auth.has_perm(USER, 'w', HOME) is False) |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) |
| self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) |
| self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False) |
| self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False) |
| |
| self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) |
| path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) |
| self.assert_(auth.has_perm(USER, 'w', path) is False) |
| # test case-sensitiveness |
| if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): |
| self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) |
| |
| |
| class TestCallLater(unittest.TestCase): |
| """Tests for CallLater class.""" |
| |
| def setUp(self): |
| for task in ftpserver._tasks: |
| if not task.cancelled: |
| task.cancel() |
| del ftpserver._tasks[:] |
| |
| def scheduler(self, timeout=0.01, count=100): |
| while ftpserver._tasks and count > 0: |
| ftpserver._scheduler() |
| count -= 1 |
| time.sleep(timeout) |
| |
| def test_interface(self): |
| fun = lambda: 0 |
| self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun) |
| x = ftpserver.CallLater(3, fun) |
| self.assertRaises(AssertionError, x.delay, -1) |
| self.assert_(x.cancelled is False) |
| x.cancel() |
| self.assert_(x.cancelled is True) |
| self.assertRaises(AssertionError, x.call) |
| self.assertRaises(AssertionError, x.reset) |
| self.assertRaises(AssertionError, x.delay, 2) |
| self.assertRaises(AssertionError, x.cancel) |
| |
| def test_order(self): |
| l = [] |
| fun = lambda x: l.append(x) |
| for x in [0.05, 0.04, 0.03, 0.02, 0.01]: |
| ftpserver.CallLater(x, fun, x) |
| self.scheduler() |
| self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) |
| |
| def test_delay(self): |
| l = [] |
| fun = lambda x: l.append(x) |
| ftpserver.CallLater(0.01, fun, 0.01).delay(0.07) |
| ftpserver.CallLater(0.02, fun, 0.02).delay(0.08) |
| ftpserver.CallLater(0.03, fun, 0.03) |
| ftpserver.CallLater(0.04, fun, 0.04) |
| ftpserver.CallLater(0.05, fun, 0.05) |
| ftpserver.CallLater(0.06, fun, 0.06).delay(0.001) |
| self.scheduler() |
| self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02]) |
| |
| def test_reset(self): |
| # will fail on such systems where time.time() does not provide |
| # time with a better precision than 1 second. |
| l = [] |
| fun = lambda x: l.append(x) |
| ftpserver.CallLater(0.01, fun, 0.01) |
| ftpserver.CallLater(0.02, fun, 0.02) |
| ftpserver.CallLater(0.03, fun, 0.03) |
| x = ftpserver.CallLater(0.04, fun, 0.04) |
| ftpserver.CallLater(0.05, fun, 0.05) |
| time.sleep(0.1) |
| x.reset() |
| self.scheduler() |
| self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04]) |
| |
| def test_cancel(self): |
| l = [] |
| fun = lambda x: l.append(x) |
| ftpserver.CallLater(0.01, fun, 0.01).cancel() |
| ftpserver.CallLater(0.02, fun, 0.02) |
| ftpserver.CallLater(0.03, fun, 0.03) |
| ftpserver.CallLater(0.04, fun, 0.04) |
| ftpserver.CallLater(0.05, fun, 0.05).cancel() |
| self.scheduler() |
| self.assertEqual(l, [0.02, 0.03, 0.04]) |
| |
| |
| class TestFtpAuthentication(unittest.TestCase): |
| "test: USER, PASS, REIN." |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.f1 = open(TESTFN, 'w+b') |
| self.f2 = open(TESTFN2, 'w+b') |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| if not self.f1.closed: |
| self.f1.close() |
| if not self.f2.closed: |
| self.f2.close() |
| os.remove(TESTFN) |
| os.remove(TESTFN2) |
| |
| def test_auth_ok(self): |
| self.client.login(user=USER, passwd=PASSWD) |
| |
| def test_anon_auth(self): |
| self.client.login(user='anonymous', passwd='anon@') |
| self.client.login(user='AnonYmoUs', passwd='anon@') |
| self.client.login(user='anonymous', passwd='') |
| |
| # Commented after delayed response on wrong credentials has been |
| # introduced because tests take too much to complete. |
| |
| ## def test_auth_failed(self): |
| ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD) |
| ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong') |
| |
| ## def test_max_auth(self): |
| ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') |
| ## # If authentication fails for 3 times ftpd disconnects the |
| ## # client. We can check if that happens by using self.client.sendcmd() |
| ## # on the 'dead' socket object. If socket object is really |
| ## # closed it should be raised a socket.error exception (Windows) |
| ## # or a EOFError exception (Linux). |
| ## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '') |
| |
| def test_rein(self): |
| self.client.login(user=USER, passwd=PASSWD) |
| self.client.sendcmd('rein') |
| # user not authenticated, error response expected |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') |
| # by logging-in again we should be able to execute a |
| # file-system command |
| self.client.login(user=USER, passwd=PASSWD) |
| self.client.sendcmd('pwd') |
| |
| def test_rein_during_transfer(self): |
| self.client.login(user=USER, passwd=PASSWD) |
| data = 'abcde12345' * 100000 |
| self.f1.write(data) |
| self.f1.close() |
| |
| self.client.voidcmd('TYPE I') |
| conn = self.client.transfercmd('retr ' + TESTFN) |
| rein_sent = 0 |
| while 1: |
| chunk = conn.recv(8192) |
| if not chunk: |
| break |
| self.f2.write(chunk) |
| if not rein_sent: |
| rein_sent = 1 |
| # flush account, error response expected |
| self.client.sendcmd('rein') |
| self.assertRaises(ftplib.error_perm, self.client.dir) |
| |
| # a 226 response is expected once tranfer finishes |
| self.assertEqual(self.client.voidresp()[:3], '226') |
| # account is still flushed, error response is still expected |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, |
| 'size ' + TESTFN) |
| # by logging-in again we should be able to execute a |
| # filesystem command |
| self.client.login(user=USER, passwd=PASSWD) |
| self.client.sendcmd('pwd') |
| self.f2.seek(0) |
| self.assertEqual(hash(data), hash (self.f2.read())) |
| |
| def test_user(self): |
| # Test USER while already authenticated and no transfer |
| # is in progress. |
| self.client.login(user=USER, passwd=PASSWD) |
| self.client.sendcmd('user ' + USER) # authentication flushed |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') |
| self.client.sendcmd('pass ' + PASSWD) |
| self.client.sendcmd('pwd') |
| |
| def test_user_on_transfer(self): |
| # Test USER while already authenticated and a transfer is |
| # in progress. |
| self.client.login(user=USER, passwd=PASSWD) |
| data = 'abcde12345' * 100000 |
| self.f1.write(data) |
| self.f1.close() |
| |
| self.client.voidcmd('TYPE I') |
| conn = self.client.transfercmd('retr ' + TESTFN) |
| rein_sent = 0 |
| while 1: |
| chunk = conn.recv(8192) |
| if not chunk: |
| break |
| self.f2.write(chunk) |
| # stop transfer while it isn't finished yet |
| if not rein_sent: |
| rein_sent = 1 |
| # flush account, expect an error response |
| self.client.sendcmd('user ' + USER) |
| self.assertRaises(ftplib.error_perm, self.client.dir) |
| |
| # a 226 response is expected once tranfer finishes |
| self.assertEqual(self.client.voidresp()[:3], '226') |
| # account is still flushed, error response is still expected |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') |
| # by logging-in again we should be able to execute a |
| # filesystem command |
| self.client.sendcmd('pass ' + PASSWD) |
| self.client.sendcmd('pwd') |
| self.f2.seek(0) |
| self.assertEqual(hash(data), hash (self.f2.read())) |
| |
| |
| class TestFtpDummyCmds(unittest.TestCase): |
| "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP" |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| |
| def test_type(self): |
| self.client.sendcmd('type a') |
| self.client.sendcmd('type i') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?') |
| |
| def test_stru(self): |
| self.client.sendcmd('stru f') |
| self.client.sendcmd('stru F') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?') |
| |
| def test_mode(self): |
| self.client.sendcmd('mode s') |
| self.client.sendcmd('mode S') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?') |
| |
| def test_noop(self): |
| self.client.sendcmd('noop') |
| |
| def test_syst(self): |
| self.client.sendcmd('syst') |
| |
| def test_allo(self): |
| self.client.sendcmd('allo x') |
| |
| def test_quit(self): |
| self.client.sendcmd('quit') |
| |
| def test_help(self): |
| self.client.sendcmd('help') |
| cmd = random.choice(ftpserver.proto_cmds.keys()) |
| self.client.sendcmd('help %s' %cmd) |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?') |
| |
| def test_rest(self): |
| # test error conditions only; |
| # restored data-transfer is tested later |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1') |
| |
| def test_opts_feat(self): |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst') |
| # utility function which used for extracting the MLST "facts" |
| # string from the FEAT response |
| def mlst(): |
| resp = self.client.sendcmd('feat') |
| return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1) |
| # we rely on "type", "perm", "size", and "modify" facts which |
| # are those available on all platforms |
| self.failUnless('type*;perm*;size*;modify*;' in mlst()) |
| self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;') |
| self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;') |
| self.failUnless('type*;perm;size;modify;' in mlst()) |
| |
| self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ') |
| self.failUnless(not '*' in mlst()) |
| |
| self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ') |
| self.failUnless(not '*' in mlst()) |
| self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \ |
| '200 MLST OPTS type;') |
| self.failUnless('type*;perm;size;modify;' in mlst()) |
| |
| |
| class TestFtpCmdsSemantic(unittest.TestCase): |
| |
| arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port', |
| 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type', |
| 'user','xmkd','xrmd') |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| |
| def test_arg_cmds(self): |
| # test commands requiring an argument |
| expected = "501 Syntax error: command needs an argument." |
| for cmd in self.arg_cmds: |
| self.client.putcmd(cmd) |
| resp = self.client.getmultiline() |
| self.assertEqual(resp, expected) |
| |
| def test_no_arg_cmds(self): |
| # test commands accepting no arguments |
| expected = "501 Syntax error: command does not accept arguments." |
| for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein', |
| 'syst','xcup','xpwd'): |
| self.client.putcmd(cmd + ' arg') |
| resp = self.client.getmultiline() |
| self.assertEqual(resp, expected) |
| |
| def test_auth_cmds(self): |
| # test those commands requiring client to be authenticated |
| expected = "530 Log in with USER and PASS first." |
| self.client.sendcmd('rein') |
| for cmd in ftpserver.proto_cmds: |
| cmd = cmd.lower() |
| if cmd in ('feat','help','noop','user','pass','stat','syst','quit'): |
| continue |
| if cmd in self.arg_cmds: |
| cmd = cmd + ' arg' |
| self.client.putcmd(cmd) |
| resp = self.client.getmultiline() |
| self.assertEqual(resp, expected) |
| |
| def test_no_auth_cmds(self): |
| # test those commands that do not require client to be authenticated |
| self.client.sendcmd('rein') |
| for cmd in ('feat','help','noop','stat','syst'): |
| self.client.sendcmd(cmd) |
| # STAT provided with an argument is equal to LIST hence not allowed |
| # if not authenticated |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /') |
| self.client.sendcmd('quit') |
| |
| |
| class TestFtpFsOperations(unittest.TestCase): |
| "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT" |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| self.tempfile = os.path.basename(open(TESTFN, 'w+b').name) |
| self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) |
| os.mkdir(self.tempdir) |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| if os.path.exists(self.tempfile): |
| os.remove(self.tempfile) |
| if os.path.exists(self.tempdir): |
| os.rmdir(self.tempdir) |
| |
| def test_cwd(self): |
| self.client.cwd(self.tempdir) |
| self.assertEqual(self.client.pwd(), '/' + self.tempdir) |
| self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir') |
| # cwd provided with no arguments is supposed to move us to the |
| # root directory |
| self.client.sendcmd('cwd') |
| self.assertEqual(self.client.pwd(), '/') |
| |
| def test_pwd(self): |
| self.assertEqual(self.client.pwd(), '/') |
| self.client.cwd(self.tempdir) |
| self.assertEqual(self.client.pwd(), '/' + self.tempdir) |
| |
| def test_cdup(self): |
| self.client.cwd(self.tempdir) |
| self.assertEqual(self.client.pwd(), '/' + self.tempdir) |
| self.client.sendcmd('cdup') |
| self.assertEqual(self.client.pwd(), '/') |
| # make sure we can't escape from root directory |
| self.client.sendcmd('cdup') |
| self.assertEqual(self.client.pwd(), '/') |
| |
| def test_mkd(self): |
| tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) |
| self.client.mkd(tempdir) |
| # make sure we can't create directories which already exist |
| # (probably not really necessary); |
| # let's use a try/except statement to avoid leaving behind |
| # orphaned temporary directory in the event of a test failure. |
| try: |
| self.client.mkd(tempdir) |
| except ftplib.error_perm: |
| os.rmdir(tempdir) # ok |
| else: |
| self.fail('ftplib.error_perm not raised.') |
| |
| def test_rmd(self): |
| self.client.rmd(self.tempdir) |
| self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile) |
| # make sure we can't remove the root directory |
| self.assertRaises(ftplib.error_perm, self.client.rmd, '/') |
| |
| def test_dele(self): |
| self.client.delete(self.tempfile) |
| self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir) |
| |
| def test_rnfr_rnto(self): |
| # rename file |
| tempname = os.path.basename(tempfile.mktemp(dir=HOME)) |
| self.client.rename(self.tempfile, tempname) |
| self.client.rename(tempname, self.tempfile) |
| # rename dir |
| tempname = os.path.basename(tempfile.mktemp(dir=HOME)) |
| self.client.rename(self.tempdir, tempname) |
| self.client.rename(tempname, self.tempdir) |
| # rnfr/rnto over non-existing paths |
| bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x') |
| self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/') |
| # make sure we can't rename root directory |
| self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x') |
| |
| def test_mdtm(self): |
| self.client.sendcmd('mdtm ' + self.tempfile) |
| # make sure we can't use mdtm against directories |
| try: |
| self.client.sendcmd('mdtm ' + self.tempdir) |
| except ftplib.error_perm, err: |
| self.failUnless("not retrievable" in str(err)) |
| else: |
| self.fail('Exception not raised') |
| |
| def test_size(self): |
| self.client.size(self.tempfile) |
| # make sure we can't use size against directories |
| try: |
| self.client.sendcmd('size ' + self.tempdir) |
| except ftplib.error_perm, err: |
| self.failUnless("not retrievable" in str(err)) |
| else: |
| self.fail('Exception not raised') |
| |
| |
| class TestFtpRetrieveData(unittest.TestCase): |
| "test: RETR, REST, LIST, NLST, argumented STAT" |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| self.f1 = open(TESTFN, 'w+b') |
| self.f2 = open(TESTFN2, 'w+b') |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| if not self.f1.closed: |
| self.f1.close() |
| if not self.f2.closed: |
| self.f2.close() |
| os.remove(TESTFN) |
| os.remove(TESTFN2) |
| |
| def test_retr(self): |
| data = 'abcde12345' * 100000 |
| self.f1.write(data) |
| self.f1.close() |
| self.client.retrbinary("retr " + TESTFN, self.f2.write) |
| self.f2.seek(0) |
| self.assertEqual(hash(data), hash(self.f2.read())) |
| |
| def test_restore_on_retr(self): |
| data = 'abcde12345' * 100000 |
| fname_1 = os.path.basename(self.f1.name) |
| self.f1.write(data) |
| self.f1.close() |
| |
| # look at ftplib.FTP.retrbinary method to understand this mess |
| self.client.voidcmd('TYPE I') |
| conn = self.client.transfercmd('retr ' + fname_1) |
| chunk = conn.recv(len(data) / 2) |
| self.f2.write(chunk) |
| conn.close() |
| # transfer wasn't finished yet so we expect a 426 response |
| self.assertRaises(ftplib.error_temp, self.client.voidresp) |
| |
| # resuming transfer by using a marker value greater than the |
| # file size stored on the server should result in an error |
| # on retr (RFC-1123) |
| file_size = self.client.size(fname_1) |
| self.client.sendcmd('rest %s' %((file_size + 1))) |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fname_1) |
| |
| # test resume |
| self.client.sendcmd('rest %s' %len(chunk)) |
| self.client.retrbinary("retr " + fname_1, self.f2.write) |
| self.f2.seek(0) |
| self.assertEqual(hash(data), hash (self.f2.read())) |
| |
| def _test_listing_cmds(self, cmd): |
| """Tests common to LIST NLST and MLSD commands.""" |
| # assume that no argument has the same meaning of "/" |
| l1 = l2 = [] |
| self.client.retrlines(cmd, l1.append) |
| self.client.retrlines(cmd + ' /', l2.append) |
| self.assertEqual(l1, l2) |
| if cmd.lower() != 'mlsd': |
| # if pathname is a file one line is expected |
| x = [] |
| self.client.retrlines('%s ' %cmd + TESTFN, x.append) |
| self.assertEqual(len(x), 1) |
| self.failUnless(''.join(x).endswith(TESTFN)) |
| # non-existent path, 550 response is expected |
| bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| self.assertRaises(ftplib.error_perm, self.client.retrlines, |
| '%s ' %cmd + bogus, lambda x: x) |
| # for an empty directory we excpect that the data channel is |
| # opened anyway and that no data is received |
| x = [] |
| tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME)) |
| try: |
| self.client.retrlines('%s %s' %(cmd, tempdir), x.append) |
| self.assertEqual(x, []) |
| finally: |
| os.rmdir(tempdir) |
| |
| def test_nlst(self): |
| # common tests |
| self._test_listing_cmds('nlst') |
| |
| def test_list(self): |
| # common tests |
| self._test_listing_cmds('list') |
| # known incorrect pathname arguments (e.g. old clients) are |
| # expected to be treated as if pathname would be == '/' |
| l1 = l2 = l3 = l4 = l5 = [] |
| self.client.retrlines('list /', l1.append) |
| self.client.retrlines('list -a', l2.append) |
| self.client.retrlines('list -l', l3.append) |
| self.client.retrlines('list -al', l4.append) |
| self.client.retrlines('list -la', l5.append) |
| tot = (l1, l2, l3, l4, l5) |
| for x in range(len(tot) - 1): |
| self.assertEqual(tot[x], tot[x+1]) |
| |
| def test_mlst(self): |
| # utility function for extracting the line of interest |
| mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1] |
| |
| # the fact set must be preceded by a space |
| self.failUnless(mlstline('mlst').startswith(' ')) |
| # where TVFS is supported, a fully qualified pathname is expected |
| self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN)) |
| self.failUnless(mlstline('mlst').endswith('/')) |
| # assume that no argument has the same meaning of "/" |
| self.assertEqual(mlstline('mlst'), mlstline('mlst /')) |
| # non-existent path |
| bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| self.assertRaises(ftplib.error_perm, mlstline, bogus) |
| # test file/dir notations |
| self.failUnless('type=dir' in mlstline('mlst')) |
| self.failUnless('type=file' in mlstline('mlst ' + TESTFN)) |
| # let's add some tests for OPTS command |
| self.client.sendcmd('opts mlst type;') |
| self.assertEqual(mlstline('mlst'), ' type=dir; /') |
| # where no facts are present, two leading spaces before the |
| # pathname are required (RFC-3659) |
| self.client.sendcmd('opts mlst') |
| self.assertEqual(mlstline('mlst'), ' /') |
| |
| def test_mlsd(self): |
| # common tests |
| self._test_listing_cmds('mlsd') |
| dir = os.path.basename(tempfile.mkdtemp(dir=HOME)) |
| try: |
| try: |
| self.client.retrlines('mlsd ' + TESTFN, lambda x: x) |
| except ftplib.error_perm, resp: |
| # if path is a file a 501 response code is expected |
| self.assertEqual(str(resp)[0:3], "501") |
| else: |
| self.fail("Exception not raised") |
| finally: |
| os.rmdir(dir) |
| |
| def test_stat(self): |
| # test STAT provided with argument which is equal to LIST |
| self.client.sendcmd('stat /') |
| self.client.sendcmd('stat ' + TESTFN) |
| self.client.putcmd('stat *') |
| resp = self.client.getmultiline() |
| self.assertEqual(resp, '550 Globbing not supported.') |
| bogus = os.path.basename(tempfile.mktemp(dir=HOME)) |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus) |
| |
| |
| class TestFtpAbort(unittest.TestCase): |
| "test: ABOR" |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| self.f1 = open(TESTFN, 'w+b') |
| self.f2 = open(TESTFN2, 'w+b') |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| if not self.f1.closed: |
| self.f1.close() |
| if not self.f2.closed: |
| self.f2.close() |
| os.remove(self.f1.name) |
| os.remove(self.f2.name) |
| |
| def test_abor_no_data(self): |
| # Case 1: ABOR while no data channel is opened: respond with 225. |
| resp = self.client.sendcmd('ABOR') |
| self.failUnlessEqual('225 No transfer to abort.', resp) |
| |
| def test_abor_pasv(self): |
| # Case 2: user sends a PASV, a data-channel socket is listening |
| # but not connected, and ABOR is sent: close listening data |
| # socket, respond with 225. |
| self.client.makepasv() |
| respcode = self.client.sendcmd('ABOR')[:3] |
| self.failUnlessEqual('225', respcode) |
| |
| def test_abor_port(self): |
| # Case 3: data channel opened with PASV or PORT, but ABOR sent |
| # before a data transfer has been started: close data channel, |
| # respond with 225 |
| self.client.makeport() |
| respcode = self.client.sendcmd('ABOR')[:3] |
| self.failUnlessEqual('225', respcode) |
| |
| def test_abor(self): |
| # Case 4: ABOR while a data transfer on DTP channel is in |
| # progress: close data channel, respond with 426, respond |
| # with 226. |
| data = 'abcde12345' * 100000 |
| self.f1.write(data) |
| self.f1.close() |
| |
| # this ugly loop construct is to simulate an interrupted |
| # transfer since ftplib doesn't like running storbinary() |
| # in a separate thread |
| self.client.voidcmd('TYPE I') |
| conn = self.client.transfercmd('retr ' + TESTFN) |
| chunk = conn.recv(len(data) / 2) |
| # stop transfer while it isn't finished yet |
| self.client.putcmd('ABOR') |
| |
| # transfer isn't finished yet so ftpd should respond with 426 |
| self.assertRaises(ftplib.error_temp, self.client.voidresp) |
| |
| # transfer successfully aborted, so should now respond with a 226 |
| self.failUnlessEqual('226', self.client.voidresp()[:3]) |
| |
| if hasattr(socket, 'MSG_OOB'): |
| def test_oob_abor(self): |
| # Send ABOR by following the RFC-959 directives of sending |
| # Telnet IP/Synch sequence as OOB data. |
| # On some systems like FreeBSD this happened to be a problem |
| # due to a different SO_OOBINLINE behavior. |
| # On some platforms (e.g. Python CE) the test may fail |
| # although the MSG_OOB constant is defined. |
| self.client.sock.sendall(chr(244), socket.MSG_OOB) |
| self.client.sock.sendall(chr(242), socket.MSG_OOB) |
| self.client.sock.sendall('abor\r\n') |
| self.client.sock.settimeout(1) |
| self.assertEqual(self.client.getresp()[:3], '225') |
| |
| |
| class TestFtpStoreData(unittest.TestCase): |
| "test: STOR, STOU, APPE, REST" |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| self.f1 = open(TESTFN, 'w+b') |
| self.f2 = open(TESTFN2, 'w+b') |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| if not self.f1.closed: |
| self.f1.close() |
| if not self.f2.closed: |
| self.f2.close() |
| os.remove(TESTFN) |
| os.remove(TESTFN2) |
| |
| def test_stor(self): |
| # TESTFN3 is the remote file name |
| try: |
| data = 'abcde12345' * 100000 |
| self.f1.write(data) |
| self.f1.seek(0) |
| self.client.storbinary('stor ' + TESTFN3, self.f1) |
| self.client.retrbinary('retr ' + TESTFN3, self.f2.write) |
| self.f2.seek(0) |
| self.assertEqual(hash(data), hash (self.f2.read())) |
| finally: |
| # we do not use os.remove because file could be still |
| # locked by ftpd thread |
| if os.path.exists(TESTFN3): |
| self.client.delete(TESTFN3) |
| |
| def test_stou(self): |
| data = 'abcde12345' * 100000 |
| self.f1.write(data) |
| self.f1.seek(0) |
| |
| self.client.voidcmd('TYPE I') |
| # filename comes in as "1xx FILE: <filename>" |
| filename = self.client.sendcmd('stou').split('FILE: ')[1] |
| try: |
| sock = self.client.makeport() |
| conn, sockaddr = sock.accept() |
| while 1: |
| buf = self.f1.read(8192) |
| if not buf: |
| break |
| conn.sendall(buf) |
| conn.close() |
| # transfer finished, a 226 response is expected |
| self.client.voidresp() |
| self.client.retrbinary('retr ' + filename, self.f2.write) |
| self.f2.seek(0) |
| self.assertEqual(hash(data), hash (self.f2.read())) |
| finally: |
| # we do not use os.remove because file could be |
| # still locked by ftpd thread |
| if os.path.exists(filename): |
| self.client.delete(filename) |
| |
| def test_stou_rest(self): |
| # watch for STOU preceded by REST, which makes no sense. |
| self.client.sendcmd('rest 10') |
| self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou') |
| |
| def test_appe(self): |
| # TESTFN3 is the remote file name |
| try: |
| data1 = 'abcde12345' * 100000 |
| self.f1.write(data1) |
| self.f1.seek(0) |
| self.client.storbinary('stor ' + TESTFN3, self.f1) |
| |
| data2 = 'fghil67890' * 100000 |
| self.f1.write(data2) |
| self.f1.seek(self.client.size(TESTFN3)) |
| self.client.storbinary('appe ' + TESTFN3, self.f1) |
| |
| self.client.retrbinary("retr " + TESTFN3, self.f2.write) |
| self.f2.seek(0) |
| self.assertEqual(hash(data1 + data2), hash (self.f2.read())) |
| finally: |
| # we do not use os.remove because file could be still |
| # locked by ftpd thread |
| if os.path.exists(TESTFN3): |
| self.client.delete(TESTFN3) |
| |
| def test_appe_rest(self): |
| # watch for APPE preceded by REST, which makes no sense. |
| self.client.sendcmd('rest 10') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x') |
| |
| def test_rest_on_stor(self): |
| # TESTFN3 is the remote file name |
| data = 'abcde12345' * 100000 |
| self.f1.write(data) |
| self.f1.seek(0) |
| |
| self.client.voidcmd('TYPE I') |
| conn = self.client.transfercmd('stor ' + TESTFN3) |
| bytes_sent = 0 |
| while 1: |
| chunk = self.f1.read(8192) |
| conn.sendall(chunk) |
| bytes_sent += len(chunk) |
| # stop transfer while it isn't finished yet |
| if bytes_sent >= 524288: # 2^19 |
| break |
| elif not chunk: |
| break |
| conn.close() |
| # transfer wasn't finished yet so we expect a 426 response |
| self.client.voidresp() |
| |
| # resuming transfer by using a marker value greater than the |
| # file size stored on the server should result in an error |
| # on stor |
| file_size = self.client.size(TESTFN3) |
| self.assertEqual(file_size, bytes_sent) |
| self.client.sendcmd('rest %s' %((file_size + 1))) |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN3) |
| |
| self.client.sendcmd('rest %s' %bytes_sent) |
| self.client.storbinary('stor ' + TESTFN3, self.f1) |
| |
| self.client.retrbinary('retr ' + TESTFN3, self.f2.write) |
| self.f1.seek(0) |
| self.f2.seek(0) |
| self.assertEqual(hash(self.f1.read()), hash(self.f2.read())) |
| self.client.delete(TESTFN3) |
| |
| |
| class TestTimeouts(unittest.TestCase): |
| """Test idle-timeout capabilities of control and data channels. |
| Some tests may fail on slow machines. |
| """ |
| |
| def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30): |
| self.server = FTPd() |
| self.server.handler.timeout = idle_timeout |
| self.server.handler.dtp_handler.timeout = data_timeout |
| self.server.handler.passive_dtp.timeout = pasv_timeout |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.handler.timeout = 300 |
| self.server.handler.dtp_handler.timeout = 300 |
| self.server.handler.passive_dtp.timeout = 30 |
| self.server.stop() |
| |
| def test_idle_timeout(self): |
| # Test control channel timeout. The client which does not send |
| # any command within the time specified in FTPHandler.timeout is |
| # supposed to be kicked off. |
| self._setUp(idle_timeout=0.1) |
| # fail if no msg is received within 1 second |
| self.client.sock.settimeout(1) |
| data = self.client.sock.recv(1024) |
| self.assertEqual(data, "421 Control connection timed out.\r\n") |
| # ensure client has been kicked off |
| self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| |
| def test_data_timeout(self): |
| # Test data channel timeout. The client which does not send |
| # or receive any data within the time specified in |
| # DTPHandler.timeout is supposed to be kicked off. |
| self._setUp(data_timeout=0.1) |
| addr = self.client.makepasv() |
| s = socket.socket() |
| s.connect(addr) |
| # fail if no msg is received within 1 second |
| self.client.sock.settimeout(1) |
| data = self.client.sock.recv(1024) |
| self.assertEqual(data, "421 Data connection timed out.\r\n") |
| # ensure client has been kicked off |
| self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| |
| def test_idle_data_timeout1(self): |
| # Tests that the control connection timeout is suspended while |
| # the data channel is opened |
| self._setUp(idle_timeout=0.1, data_timeout=0.2) |
| addr = self.client.makepasv() |
| s = socket.socket() |
| s.connect(addr) |
| # fail if no msg is received within 1 second |
| self.client.sock.settimeout(1) |
| data = self.client.sock.recv(1024) |
| self.assertEqual(data, "421 Data connection timed out.\r\n") |
| # ensure client has been kicked off |
| self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| |
| def test_idle_data_timeout2(self): |
| # Tests that the control connection timeout is restarted after |
| # data channel has been closed |
| self._setUp(idle_timeout=0.1, data_timeout=0.2) |
| addr = self.client.makepasv() |
| s = socket.socket() |
| s.connect(addr) |
| # close data channel |
| self.client.sendcmd('abor') |
| self.client.sock.settimeout(1) |
| data = self.client.sock.recv(1024) |
| self.assertEqual(data, "421 Control connection timed out.\r\n") |
| # ensure client has been kicked off |
| self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') |
| |
| def test_pasv_timeout(self): |
| # Test pasv data channel timeout. The client which does not connect |
| # to the listening data socket within the time specified in |
| # PassiveDTP.timeout is supposed to receive a 421 response. |
| self._setUp(pasv_timeout=0.1) |
| self.client.makepasv() |
| # fail if no msg is received within 1 second |
| self.client.sock.settimeout(1) |
| data = self.client.sock.recv(1024) |
| self.assertEqual(data, "421 Passive data channel timed out.\r\n") |
| # client is not expected to be kicked off |
| self.client.sendcmd('noop') |
| |
| |
| class TestMaxConnections(unittest.TestCase): |
| """Test maximum connections (FTPServer.max_cons).""" |
| |
| def setUp(self): |
| self.server = FTPd() |
| self.server.server.max_cons = 3 |
| self.server.start() |
| |
| def tearDown(self): |
| self.server.server.max_cons = 0 |
| self.server.stop() |
| |
| def test_max_connections(self): |
| c1 = ftplib.FTP() |
| c2 = ftplib.FTP() |
| c3 = ftplib.FTP() |
| try: |
| c1.connect(self.server.host, self.server.port) |
| c2.connect(self.server.host, self.server.port) |
| self.assertRaises(ftplib.error_temp, c3.connect, self.server.host, |
| self.server.port) |
| # with passive data channel established |
| c2.close() |
| c1.login(USER, PASSWD) |
| c1.makepasv() |
| self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, |
| self.server.port) |
| # with passive data socket waiting for connection |
| c1.login(USER, PASSWD) |
| c1.sendcmd('pasv') |
| self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, |
| self.server.port) |
| # with active data channel established |
| c1.login(USER, PASSWD) |
| c1.makeport() |
| self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, |
| self.server.port) |
| finally: |
| c1.close() |
| c2.close() |
| c3.close() |
| |
| |
| class _TestNetworkProtocols(unittest.TestCase): |
| """Test PASV, EPSV, PORT and EPRT commands. |
| |
| Do not use this class directly. Let TestIPv4Environment and |
| TestIPv6Environment classes use it instead. |
| """ |
| HOST = HOST |
| |
| def setUp(self): |
| self.server = FTPd(self.HOST) |
| self.server.start() |
| self.client = ftplib.FTP() |
| self.client.connect(self.server.host, self.server.port) |
| self.client.login(USER, PASSWD) |
| if self.client.af == socket.AF_INET: |
| self.proto = "1" |
| self.other_proto = "2" |
| else: |
| self.proto = "2" |
| self.other_proto = "1" |
| |
| def tearDown(self): |
| self.client.close() |
| self.server.stop() |
| |
| def cmdresp(self, cmd): |
| """Send a command and return response, also if the command failed.""" |
| try: |
| return self.client.sendcmd(cmd) |
| except ftplib.Error, err: |
| return str(err) |
| |
| def test_eprt(self): |
| # test wrong proto |
| try: |
| self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto, |
| self.server.host, self.server.port)) |
| except ftplib.error_perm, err: |
| self.assertEqual(str(err)[0:3], "522") |
| else: |
| self.fail("Exception not raised") |
| |
| # test bad args |
| msg = "501 Invalid EPRT format." |
| # len('|') > 3 |
| self.assertEqual(self.cmdresp('eprt ||||'), msg) |
| # len('|') < 3 |
| self.assertEqual(self.cmdresp('eprt ||'), msg) |
| # port > 65535 |
| self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto, |
| self.HOST)), msg) |
| # port < 0 |
| self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto, |
| self.HOST)), msg) |
| # port < 1024 |
| self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto, |
| self.HOST)), "501 Can't connect over a privileged port.") |
| |
| # test connection |
| sock = socket.socket(self.client.af, socket.SOCK_STREAM) |
| sock.bind((self.client.sock.getsockname()[0], 0)) |
| sock.listen(5) |
| sock.settimeout(2) |
| ip, port = sock.getsockname()[:2] |
| self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port)) |
| try: |
| try: |
| sock.accept() |
| except socket.timeout: |
| self.fail("Server didn't connect to passive socket") |
| finally: |
| sock.close() |
| |
| def test_epsv(self): |
| # test wrong proto |
| try: |
| self.client.sendcmd('epsv ' + self.other_proto) |
| except ftplib.error_perm, err: |
| self.assertEqual(str(err)[0:3], "522") |
| else: |
| self.fail("Exception not raised") |
| |
| # test connection |
| for cmd in ('EPSV', 'EPSV ' + self.proto): |
| host, port = ftplib.parse229(self.client.sendcmd(cmd), |
| self.client.sock.getpeername()) |
| s = socket.socket(self.client.af, socket.SOCK_STREAM) |
| s.settimeout(2) |
| try: |
| s.connect((host, port)) |
| self.client.sendcmd('abor') |
| finally: |
| s.close() |
| |
| def test_epsv_all(self): |
| self.client.sendcmd('epsv all') |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv') |
| self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000) |
| self.assertRaises(ftplib.error_perm, self.client.sendcmd, |
| 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000)) |
| |
| |
| class TestIPv4Environment(_TestNetworkProtocols): |
| """Test PASV, EPSV, PORT and EPRT commands. |
| |
| Runs tests contained in _TestNetworkProtocols class by using IPv4 |
| plus some additional specific tests. |
| """ |
| HOST = '127.0.0.1' |
| |
| def test_port_v4(self): |
| # test connection |
| self.client.makeport() |
| self.client.sendcmd('abor') |
| # test bad arguments |
| ae = self.assertEqual |
| msg = "501 Invalid PORT format." |
| ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ',' |
| ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int |
| ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6 |
| ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6 |
| ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255 |
| ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535 |
| ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0 |
| msg = "501 Can't connect over a privileged port." |
| ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port < 1024 |
| if "1.2.3.4" != self.HOST: |
| msg = "501 Can't connect to a foreign address." |
| ae(self.cmdresp('port 1,2,3,4,4,4'), msg) |
| |
| def test_eprt_v4(self): |
| self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'), |
| "501 Can't connect to a foreign address.") |
| |
| def test_pasv_v4(self): |
| host, port = ftplib.parse227(self.client.sendcmd('pasv')) |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| s.settimeout(2) |
| try: |
| s.connect((host, port)) |
| finally: |
| s.close() |
| |
| |
| class TestIPv6Environment(_TestNetworkProtocols): |
| """Test PASV, EPSV, PORT and EPRT commands. |
| |
| Runs tests contained in _TestNetworkProtocols class by using IPv6 |
| plus some additional specific tests. |
| """ |
| HOST = '::1' |
| |
| def test_port_v6(self): |
| # 425 expected |
| self.assertRaises(ftplib.error_temp, self.client.sendport, |
| self.server.host, self.server.port) |
| |
| def test_pasv_v6(self): |
| # 425 expected |
| self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv') |
| |
| def test_eprt_v6(self): |
| self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'), |
| "501 Can't connect to a foreign address.") |
| |
| |
| class FTPd(threading.Thread): |
| """A threaded FTP server used for running tests.""" |
| |
| def __init__(self, host=HOST, port=0, verbose=False): |
| threading.Thread.__init__(self) |
| self.active = False |
| if not verbose: |
| ftpserver.log = ftpserver.logline = lambda x: x |
| self.authorizer = ftpserver.DummyAuthorizer() |
| self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full perms |
| self.authorizer.add_anonymous(HOME) |
| self.handler = ftpserver.FTPHandler |
| self.handler.authorizer = self.authorizer |
| self.server = ftpserver.FTPServer((host, port), self.handler) |
| self.host, self.port = self.server.socket.getsockname()[:2] |
| self.active_lock = threading.Lock() |
| |
| def start(self): |
| assert not self.active |
| self.__flag = threading.Event() |
| threading.Thread.start(self) |
| self.__flag.wait() |
| |
| def run(self): |
| self.active = True |
| self.__flag.set() |
| while self.active: |
| self.active_lock.acquire() |
| self.server.serve_forever(timeout=0.001, count=1) |
| self.active_lock.release() |
| self.server.close_all(ignore_all=True) |
| |
| def stop(self): |
| assert self.active |
| self.active = False |
| self.join() |
| |
| |
| def remove_test_files(): |
| "Convenience function for removing temporary test files" |
| for file in [TESTFN, TESTFN2, TESTFN3]: |
| try: |
| os.remove(file) |
| except os.error: |
| pass |
| |
| def test_main(tests=None): |
| test_suite = unittest.TestSuite() |
| if tests is None: |
| tests = [ |
| TestAbstractedFS, |
| TestDummyAuthorizer, |
| TestCallLater, |
| TestFtpAuthentication, |
| TestFtpDummyCmds, |
| TestFtpCmdsSemantic, |
| TestFtpFsOperations, |
| TestFtpRetrieveData, |
| TestFtpAbort, |
| TestFtpStoreData, |
| TestTimeouts, |
| TestMaxConnections |
| ] |
| if SUPPORTS_IPV4: |
| tests.append(TestIPv4Environment) |
| if SUPPORTS_IPV6: |
| tests.append(TestIPv6Environment) |
| |
| for test in tests: |
| test_suite.addTest(unittest.makeSuite(test)) |
| remove_test_files() |
| unittest.TextTestRunner(verbosity=2).run(test_suite) |
| remove_test_files() |
| |
| |
| if __name__ == '__main__': |
| test_main() |