| import os |
| import re |
| import signal |
| import tempfile |
| import threading |
| from datetime import datetime |
| |
| import pytest |
| import requests |
| |
| import docker |
| from .. import helpers |
| from ..helpers import assert_cat_socket_detached_with_keys |
| from ..helpers import ctrl_with |
| from ..helpers import requires_api_version |
| from .base import BaseAPIIntegrationTest |
| from .base import TEST_IMG |
| from docker.constants import IS_WINDOWS_PLATFORM |
| from docker.utils.socket import next_frame_header |
| from docker.utils.socket import read_exactly |
| |
| |
| class ListContainersTest(BaseAPIIntegrationTest): |
| def test_list_containers(self): |
| res0 = self.client.containers(all=True) |
| size = len(res0) |
| res1 = self.client.create_container(TEST_IMG, 'true') |
| assert 'Id' in res1 |
| self.client.start(res1['Id']) |
| self.tmp_containers.append(res1['Id']) |
| res2 = self.client.containers(all=True) |
| assert size + 1 == len(res2) |
| retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])] |
| assert len(retrieved) == 1 |
| retrieved = retrieved[0] |
| assert 'Command' in retrieved |
| assert retrieved['Command'] == 'true' |
| assert 'Image' in retrieved |
| assert re.search(r'alpine:.*', retrieved['Image']) |
| assert 'Status' in retrieved |
| |
| |
| class CreateContainerTest(BaseAPIIntegrationTest): |
| |
| def test_create(self): |
| res = self.client.create_container(TEST_IMG, 'true') |
| assert 'Id' in res |
| self.tmp_containers.append(res['Id']) |
| |
| def test_create_with_host_pid_mode(self): |
| ctnr = self.client.create_container( |
| TEST_IMG, 'true', host_config=self.client.create_host_config( |
| pid_mode='host', network_mode='none' |
| ) |
| ) |
| assert 'Id' in ctnr |
| self.tmp_containers.append(ctnr['Id']) |
| self.client.start(ctnr) |
| inspect = self.client.inspect_container(ctnr) |
| assert 'HostConfig' in inspect |
| host_config = inspect['HostConfig'] |
| assert 'PidMode' in host_config |
| assert host_config['PidMode'] == 'host' |
| |
| def test_create_with_links(self): |
| res0 = self.client.create_container( |
| TEST_IMG, 'cat', |
| detach=True, stdin_open=True, |
| environment={'FOO': '1'}) |
| |
| container1_id = res0['Id'] |
| self.tmp_containers.append(container1_id) |
| |
| self.client.start(container1_id) |
| |
| res1 = self.client.create_container( |
| TEST_IMG, 'cat', |
| detach=True, stdin_open=True, |
| environment={'FOO': '1'}) |
| |
| container2_id = res1['Id'] |
| self.tmp_containers.append(container2_id) |
| |
| self.client.start(container2_id) |
| |
| # we don't want the first / |
| link_path1 = self.client.inspect_container(container1_id)['Name'][1:] |
| link_alias1 = 'mylink1' |
| link_env_prefix1 = link_alias1.upper() |
| |
| link_path2 = self.client.inspect_container(container2_id)['Name'][1:] |
| link_alias2 = 'mylink2' |
| link_env_prefix2 = link_alias2.upper() |
| |
| res2 = self.client.create_container( |
| TEST_IMG, 'env', host_config=self.client.create_host_config( |
| links={link_path1: link_alias1, link_path2: link_alias2}, |
| network_mode='bridge' |
| ) |
| ) |
| container3_id = res2['Id'] |
| self.tmp_containers.append(container3_id) |
| self.client.start(container3_id) |
| assert self.client.wait(container3_id)['StatusCode'] == 0 |
| |
| logs = self.client.logs(container3_id).decode('utf-8') |
| assert f'{link_env_prefix1}_NAME=' in logs |
| assert f'{link_env_prefix1}_ENV_FOO=1' in logs |
| assert f'{link_env_prefix2}_NAME=' in logs |
| assert f'{link_env_prefix2}_ENV_FOO=1' in logs |
| |
| def test_create_with_restart_policy(self): |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '2'], |
| host_config=self.client.create_host_config( |
| restart_policy={"Name": "always", "MaximumRetryCount": 0}, |
| network_mode='none' |
| ) |
| ) |
| id = container['Id'] |
| self.client.start(id) |
| self.client.wait(id) |
| with pytest.raises(docker.errors.APIError) as exc: |
| self.client.remove_container(id) |
| err = exc.value.explanation |
| assert 'You cannot remove ' in err |
| self.client.remove_container(id, force=True) |
| |
| def test_create_container_with_volumes_from(self): |
| vol_names = ['foobar_vol0', 'foobar_vol1'] |
| |
| res0 = self.client.create_container( |
| TEST_IMG, 'true', name=vol_names[0] |
| ) |
| container1_id = res0['Id'] |
| self.tmp_containers.append(container1_id) |
| self.client.start(container1_id) |
| |
| res1 = self.client.create_container( |
| TEST_IMG, 'true', name=vol_names[1] |
| ) |
| container2_id = res1['Id'] |
| self.tmp_containers.append(container2_id) |
| self.client.start(container2_id) |
| |
| res = self.client.create_container( |
| TEST_IMG, 'cat', detach=True, stdin_open=True, |
| host_config=self.client.create_host_config( |
| volumes_from=vol_names, network_mode='none' |
| ) |
| ) |
| container3_id = res['Id'] |
| self.tmp_containers.append(container3_id) |
| self.client.start(container3_id) |
| |
| info = self.client.inspect_container(res['Id']) |
| assert len(info['HostConfig']['VolumesFrom']) == len(vol_names) |
| |
| def create_container_readonly_fs(self): |
| ctnr = self.client.create_container( |
| TEST_IMG, ['mkdir', '/shrine'], |
| host_config=self.client.create_host_config( |
| read_only=True, network_mode='none' |
| ) |
| ) |
| assert 'Id' in ctnr |
| self.tmp_containers.append(ctnr['Id']) |
| self.client.start(ctnr) |
| res = self.client.wait(ctnr)['StatusCode'] |
| assert res != 0 |
| |
| def create_container_with_name(self): |
| res = self.client.create_container(TEST_IMG, 'true', name='foobar') |
| assert 'Id' in res |
| self.tmp_containers.append(res['Id']) |
| inspect = self.client.inspect_container(res['Id']) |
| assert 'Name' in inspect |
| assert '/foobar' == inspect['Name'] |
| |
| def create_container_privileged(self): |
| res = self.client.create_container( |
| TEST_IMG, 'true', host_config=self.client.create_host_config( |
| privileged=True, network_mode='none' |
| ) |
| ) |
| assert 'Id' in res |
| self.tmp_containers.append(res['Id']) |
| self.client.start(res['Id']) |
| inspect = self.client.inspect_container(res['Id']) |
| assert 'Config' in inspect |
| assert 'Id' in inspect |
| assert inspect['Id'].startswith(res['Id']) |
| assert 'Image' in inspect |
| assert 'State' in inspect |
| assert 'Running' in inspect['State'] |
| if not inspect['State']['Running']: |
| assert 'ExitCode' in inspect['State'] |
| assert inspect['State']['ExitCode'] == 0 |
| # Since Nov 2013, the Privileged flag is no longer part of the |
| # container's config exposed via the API (safety concerns?). |
| # |
| if 'Privileged' in inspect['Config']: |
| assert inspect['Config']['Privileged'] is True |
| |
| def test_create_with_mac_address(self): |
| mac_address_expected = "02:42:ac:11:00:0a" |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '60'], mac_address=mac_address_expected) |
| |
| id = container['Id'] |
| |
| self.client.start(container) |
| res = self.client.inspect_container(container['Id']) |
| assert mac_address_expected == res['NetworkSettings']['MacAddress'] |
| |
| self.client.kill(id) |
| |
| @requires_api_version('1.41') |
| def test_create_with_cgroupns(self): |
| host_config = self.client.create_host_config(cgroupns='private') |
| |
| container = self.client.create_container( |
| image=TEST_IMG, |
| command=['sleep', '60'], |
| host_config=host_config, |
| ) |
| self.tmp_containers.append(container) |
| |
| res = self.client.inspect_container(container) |
| assert 'private' == res['HostConfig']['CgroupnsMode'] |
| |
| def test_group_id_ints(self): |
| container = self.client.create_container( |
| TEST_IMG, 'id -G', |
| host_config=self.client.create_host_config(group_add=[1000, 1001]) |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| self.client.wait(container) |
| |
| logs = self.client.logs(container).decode('utf-8') |
| groups = logs.strip().split(' ') |
| assert '1000' in groups |
| assert '1001' in groups |
| |
| def test_group_id_strings(self): |
| container = self.client.create_container( |
| TEST_IMG, 'id -G', host_config=self.client.create_host_config( |
| group_add=['1000', '1001'] |
| ) |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| self.client.wait(container) |
| |
| logs = self.client.logs(container).decode('utf-8') |
| |
| groups = logs.strip().split(' ') |
| assert '1000' in groups |
| assert '1001' in groups |
| |
| def test_valid_log_driver_and_log_opt(self): |
| log_config = docker.types.LogConfig( |
| type='json-file', |
| config={'max-file': '100'} |
| ) |
| |
| container = self.client.create_container( |
| TEST_IMG, ['true'], |
| host_config=self.client.create_host_config(log_config=log_config) |
| ) |
| self.tmp_containers.append(container['Id']) |
| self.client.start(container) |
| |
| info = self.client.inspect_container(container) |
| container_log_config = info['HostConfig']['LogConfig'] |
| |
| assert container_log_config['Type'] == log_config.type |
| assert container_log_config['Config'] == log_config.config |
| |
| def test_invalid_log_driver_raises_exception(self): |
| log_config = docker.types.LogConfig( |
| type='asdf', |
| config={} |
| ) |
| |
| expected_msgs = [ |
| "logger: no log driver named 'asdf' is registered", |
| "error looking up logging plugin asdf: plugin \"asdf\" not found", |
| ] |
| with pytest.raises(docker.errors.APIError) as excinfo: |
| # raises an internal server error 500 |
| container = self.client.create_container( |
| TEST_IMG, ['true'], host_config=self.client.create_host_config( |
| log_config=log_config |
| ) |
| ) |
| self.client.start(container) |
| |
| assert excinfo.value.explanation in expected_msgs |
| |
| def test_valid_no_log_driver_specified(self): |
| log_config = docker.types.LogConfig( |
| type="", |
| config={'max-file': '100'} |
| ) |
| |
| container = self.client.create_container( |
| TEST_IMG, ['true'], |
| host_config=self.client.create_host_config(log_config=log_config) |
| ) |
| self.tmp_containers.append(container['Id']) |
| self.client.start(container) |
| |
| info = self.client.inspect_container(container) |
| container_log_config = info['HostConfig']['LogConfig'] |
| |
| assert container_log_config['Type'] == "json-file" |
| assert container_log_config['Config'] == log_config.config |
| |
| def test_valid_no_config_specified(self): |
| log_config = docker.types.LogConfig( |
| type="json-file", |
| config=None |
| ) |
| |
| container = self.client.create_container( |
| TEST_IMG, ['true'], |
| host_config=self.client.create_host_config(log_config=log_config) |
| ) |
| self.tmp_containers.append(container['Id']) |
| self.client.start(container) |
| |
| info = self.client.inspect_container(container) |
| container_log_config = info['HostConfig']['LogConfig'] |
| |
| assert container_log_config['Type'] == "json-file" |
| assert container_log_config['Config'] == {} |
| |
| def test_create_with_memory_constraints_with_str(self): |
| ctnr = self.client.create_container( |
| TEST_IMG, 'true', |
| host_config=self.client.create_host_config( |
| memswap_limit='1G', |
| mem_limit='700M' |
| ) |
| ) |
| assert 'Id' in ctnr |
| self.tmp_containers.append(ctnr['Id']) |
| self.client.start(ctnr) |
| inspect = self.client.inspect_container(ctnr) |
| |
| assert 'HostConfig' in inspect |
| host_config = inspect['HostConfig'] |
| for limit in ['Memory', 'MemorySwap']: |
| assert limit in host_config |
| |
| def test_create_with_memory_constraints_with_int(self): |
| ctnr = self.client.create_container( |
| TEST_IMG, 'true', |
| host_config=self.client.create_host_config(mem_swappiness=40) |
| ) |
| assert 'Id' in ctnr |
| self.tmp_containers.append(ctnr['Id']) |
| self.client.start(ctnr) |
| inspect = self.client.inspect_container(ctnr) |
| |
| assert 'HostConfig' in inspect |
| host_config = inspect['HostConfig'] |
| assert 'MemorySwappiness' in host_config |
| |
| def test_create_with_environment_variable_no_value(self): |
| container = self.client.create_container( |
| TEST_IMG, |
| ['echo'], |
| environment={'Foo': None, 'Other': 'one', 'Blank': ''}, |
| ) |
| self.tmp_containers.append(container['Id']) |
| config = self.client.inspect_container(container['Id']) |
| assert 'Foo' in config['Config']['Env'] |
| assert 'Other=one' in config['Config']['Env'] |
| assert 'Blank=' in config['Config']['Env'] |
| |
| @requires_api_version('1.22') |
| def test_create_with_tmpfs(self): |
| tmpfs = { |
| '/tmp1': 'size=3M' |
| } |
| |
| container = self.client.create_container( |
| TEST_IMG, |
| ['echo'], |
| host_config=self.client.create_host_config( |
| tmpfs=tmpfs)) |
| |
| self.tmp_containers.append(container['Id']) |
| config = self.client.inspect_container(container) |
| assert config['HostConfig']['Tmpfs'] == tmpfs |
| |
| @requires_api_version('1.24') |
| def test_create_with_isolation(self): |
| container = self.client.create_container( |
| TEST_IMG, ['echo'], host_config=self.client.create_host_config( |
| isolation='default' |
| ) |
| ) |
| self.tmp_containers.append(container['Id']) |
| config = self.client.inspect_container(container) |
| assert config['HostConfig']['Isolation'] == 'default' |
| |
| @requires_api_version('1.25') |
| def test_create_with_auto_remove(self): |
| host_config = self.client.create_host_config( |
| auto_remove=True |
| ) |
| container = self.client.create_container( |
| TEST_IMG, ['echo', 'test'], host_config=host_config |
| ) |
| self.tmp_containers.append(container['Id']) |
| config = self.client.inspect_container(container) |
| assert config['HostConfig']['AutoRemove'] is True |
| |
| @requires_api_version('1.25') |
| def test_create_with_stop_timeout(self): |
| container = self.client.create_container( |
| TEST_IMG, ['echo', 'test'], stop_timeout=25 |
| ) |
| self.tmp_containers.append(container['Id']) |
| config = self.client.inspect_container(container) |
| assert config['Config']['StopTimeout'] == 25 |
| |
| @requires_api_version('1.24') |
| @pytest.mark.xfail(True, reason='Not supported on most drivers') |
| def test_create_with_storage_opt(self): |
| host_config = self.client.create_host_config( |
| storage_opt={'size': '120G'} |
| ) |
| container = self.client.create_container( |
| TEST_IMG, ['echo', 'test'], host_config=host_config |
| ) |
| self.tmp_containers.append(container) |
| config = self.client.inspect_container(container) |
| assert config['HostConfig']['StorageOpt'] == { |
| 'size': '120G' |
| } |
| |
| @requires_api_version('1.25') |
| def test_create_with_init(self): |
| ctnr = self.client.create_container( |
| TEST_IMG, 'true', |
| host_config=self.client.create_host_config( |
| init=True |
| ) |
| ) |
| self.tmp_containers.append(ctnr['Id']) |
| config = self.client.inspect_container(ctnr) |
| assert config['HostConfig']['Init'] is True |
| |
| @requires_api_version('1.24') |
| @pytest.mark.xfail(not os.path.exists('/sys/fs/cgroup/cpu.rt_runtime_us'), |
| reason='CONFIG_RT_GROUP_SCHED isn\'t enabled') |
| def test_create_with_cpu_rt_options(self): |
| ctnr = self.client.create_container( |
| TEST_IMG, 'true', host_config=self.client.create_host_config( |
| cpu_rt_period=1000, cpu_rt_runtime=500 |
| ) |
| ) |
| self.tmp_containers.append(ctnr) |
| config = self.client.inspect_container(ctnr) |
| assert config['HostConfig']['CpuRealtimeRuntime'] == 500 |
| assert config['HostConfig']['CpuRealtimePeriod'] == 1000 |
| |
| @requires_api_version('1.28') |
| def test_create_with_device_cgroup_rules(self): |
| rule = 'c 7:128 rwm' |
| ctnr = self.client.create_container( |
| TEST_IMG, 'true', host_config=self.client.create_host_config( |
| device_cgroup_rules=[rule] |
| ) |
| ) |
| self.tmp_containers.append(ctnr) |
| config = self.client.inspect_container(ctnr) |
| assert config['HostConfig']['DeviceCgroupRules'] == [rule] |
| |
| def test_create_with_uts_mode(self): |
| container = self.client.create_container( |
| TEST_IMG, ['echo'], host_config=self.client.create_host_config( |
| uts_mode='host' |
| ) |
| ) |
| self.tmp_containers.append(container) |
| config = self.client.inspect_container(container) |
| assert config['HostConfig']['UTSMode'] == 'host' |
| |
| |
| @pytest.mark.xfail( |
| IS_WINDOWS_PLATFORM, reason='Test not designed for Windows platform' |
| ) |
| class VolumeBindTest(BaseAPIIntegrationTest): |
| def setUp(self): |
| super().setUp() |
| |
| self.mount_dest = '/mnt' |
| |
| # Get a random pathname - we don't need it to exist locally |
| self.mount_origin = tempfile.mkdtemp() |
| self.filename = 'shared.txt' |
| |
| self.run_with_volume( |
| False, |
| TEST_IMG, |
| ['touch', os.path.join(self.mount_dest, self.filename)], |
| ) |
| |
| def test_create_with_binds_rw(self): |
| |
| container = self.run_with_volume( |
| False, |
| TEST_IMG, |
| ['ls', self.mount_dest], |
| ) |
| logs = self.client.logs(container).decode('utf-8') |
| assert self.filename in logs |
| inspect_data = self.client.inspect_container(container) |
| self.check_container_data(inspect_data, True) |
| |
| def test_create_with_binds_ro(self): |
| self.run_with_volume( |
| False, |
| TEST_IMG, |
| ['touch', os.path.join(self.mount_dest, self.filename)], |
| ) |
| container = self.run_with_volume( |
| True, |
| TEST_IMG, |
| ['ls', self.mount_dest], |
| ) |
| logs = self.client.logs(container).decode('utf-8') |
| |
| assert self.filename in logs |
| |
| inspect_data = self.client.inspect_container(container) |
| self.check_container_data(inspect_data, False) |
| |
| @requires_api_version('1.30') |
| def test_create_with_mounts(self): |
| mount = docker.types.Mount( |
| type="bind", source=self.mount_origin, target=self.mount_dest |
| ) |
| host_config = self.client.create_host_config(mounts=[mount]) |
| container = self.run_container( |
| TEST_IMG, ['ls', self.mount_dest], |
| host_config=host_config |
| ) |
| assert container |
| logs = self.client.logs(container).decode('utf-8') |
| assert self.filename in logs |
| inspect_data = self.client.inspect_container(container) |
| self.check_container_data(inspect_data, True) |
| |
| @requires_api_version('1.30') |
| def test_create_with_mounts_ro(self): |
| mount = docker.types.Mount( |
| type="bind", source=self.mount_origin, target=self.mount_dest, |
| read_only=True |
| ) |
| host_config = self.client.create_host_config(mounts=[mount]) |
| container = self.run_container( |
| TEST_IMG, ['ls', self.mount_dest], |
| host_config=host_config |
| ) |
| assert container |
| logs = self.client.logs(container).decode('utf-8') |
| assert self.filename in logs |
| inspect_data = self.client.inspect_container(container) |
| self.check_container_data(inspect_data, False) |
| |
| @requires_api_version('1.30') |
| def test_create_with_volume_mount(self): |
| mount = docker.types.Mount( |
| type="volume", source=helpers.random_name(), |
| target=self.mount_dest, labels={'com.dockerpy.test': 'true'} |
| ) |
| host_config = self.client.create_host_config(mounts=[mount]) |
| container = self.client.create_container( |
| TEST_IMG, ['true'], host_config=host_config, |
| ) |
| assert container |
| inspect_data = self.client.inspect_container(container) |
| assert 'Mounts' in inspect_data |
| filtered = list(filter( |
| lambda x: x['Destination'] == self.mount_dest, |
| inspect_data['Mounts'] |
| )) |
| assert len(filtered) == 1 |
| mount_data = filtered[0] |
| assert mount['Source'] == mount_data['Name'] |
| assert mount_data['RW'] is True |
| |
| def check_container_data(self, inspect_data, rw): |
| assert 'Mounts' in inspect_data |
| filtered = list(filter( |
| lambda x: x['Destination'] == self.mount_dest, |
| inspect_data['Mounts'] |
| )) |
| assert len(filtered) == 1 |
| mount_data = filtered[0] |
| assert mount_data['Source'] == self.mount_origin |
| assert mount_data['RW'] == rw |
| |
| def run_with_volume(self, ro, *args, **kwargs): |
| return self.run_container( |
| *args, |
| volumes={self.mount_dest: {}}, |
| host_config=self.client.create_host_config( |
| binds={ |
| self.mount_origin: { |
| 'bind': self.mount_dest, |
| 'ro': ro, |
| }, |
| }, |
| network_mode='none' |
| ), |
| **kwargs |
| ) |
| |
| |
| class ArchiveTest(BaseAPIIntegrationTest): |
| def test_get_file_archive_from_container(self): |
| data = 'The Maid and the Pocket Watch of Blood' |
| ctnr = self.client.create_container( |
| TEST_IMG, f'sh -c "echo {data} > /vol1/data.txt"', |
| volumes=['/vol1'] |
| ) |
| self.tmp_containers.append(ctnr) |
| self.client.start(ctnr) |
| self.client.wait(ctnr) |
| with tempfile.NamedTemporaryFile() as destination: |
| strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') |
| for d in strm: |
| destination.write(d) |
| destination.seek(0) |
| retrieved_data = helpers.untar_file(destination, 'data.txt')\ |
| .decode('utf-8') |
| assert data == retrieved_data.strip() |
| |
| def test_get_file_stat_from_container(self): |
| data = 'The Maid and the Pocket Watch of Blood' |
| ctnr = self.client.create_container( |
| TEST_IMG, f'sh -c "echo -n {data} > /vol1/data.txt"', |
| volumes=['/vol1'] |
| ) |
| self.tmp_containers.append(ctnr) |
| self.client.start(ctnr) |
| self.client.wait(ctnr) |
| strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') |
| assert 'name' in stat |
| assert stat['name'] == 'data.txt' |
| assert 'size' in stat |
| assert stat['size'] == len(data) |
| |
| def test_copy_file_to_container(self): |
| data = b'Deaf To All But The Song' |
| with tempfile.NamedTemporaryFile(delete=False) as test_file: |
| test_file.write(data) |
| test_file.seek(0) |
| ctnr = self.client.create_container( |
| TEST_IMG, |
| 'cat {}'.format( |
| os.path.join('/vol1/', os.path.basename(test_file.name)) |
| ), |
| volumes=['/vol1'] |
| ) |
| self.tmp_containers.append(ctnr) |
| with helpers.simple_tar(test_file.name) as test_tar: |
| self.client.put_archive(ctnr, '/vol1', test_tar) |
| self.client.start(ctnr) |
| self.client.wait(ctnr) |
| logs = self.client.logs(ctnr) |
| assert logs.strip() == data |
| |
| def test_copy_directory_to_container(self): |
| files = ['a.py', 'b.py', 'foo/b.py'] |
| dirs = ['foo', 'bar'] |
| base = helpers.make_tree(dirs, files) |
| ctnr = self.client.create_container( |
| TEST_IMG, 'ls -p /vol1', volumes=['/vol1'] |
| ) |
| self.tmp_containers.append(ctnr) |
| with docker.utils.tar(base) as test_tar: |
| self.client.put_archive(ctnr, '/vol1', test_tar) |
| self.client.start(ctnr) |
| self.client.wait(ctnr) |
| logs = self.client.logs(ctnr).decode('utf-8') |
| results = logs.strip().split() |
| assert 'a.py' in results |
| assert 'b.py' in results |
| assert 'foo/' in results |
| assert 'bar/' in results |
| |
| |
| class RenameContainerTest(BaseAPIIntegrationTest): |
| def test_rename_container(self): |
| version = self.client.version()['Version'] |
| name = 'hong_meiling' |
| res = self.client.create_container(TEST_IMG, 'true') |
| assert 'Id' in res |
| self.tmp_containers.append(res['Id']) |
| self.client.rename(res, name) |
| inspect = self.client.inspect_container(res['Id']) |
| assert 'Name' in inspect |
| if version == '1.5.0': |
| assert name == inspect['Name'] |
| else: |
| assert f'/{name}' == inspect['Name'] |
| |
| |
| class StartContainerTest(BaseAPIIntegrationTest): |
| def test_start_container(self): |
| res = self.client.create_container(TEST_IMG, 'true') |
| assert 'Id' in res |
| self.tmp_containers.append(res['Id']) |
| self.client.start(res['Id']) |
| inspect = self.client.inspect_container(res['Id']) |
| assert 'Config' in inspect |
| assert 'Id' in inspect |
| assert inspect['Id'].startswith(res['Id']) |
| assert 'Image' in inspect |
| assert 'State' in inspect |
| assert 'Running' in inspect['State'] |
| if not inspect['State']['Running']: |
| assert 'ExitCode' in inspect['State'] |
| assert inspect['State']['ExitCode'] == 0 |
| |
| def test_start_container_with_dict_instead_of_id(self): |
| res = self.client.create_container(TEST_IMG, 'true') |
| assert 'Id' in res |
| self.tmp_containers.append(res['Id']) |
| self.client.start(res) |
| inspect = self.client.inspect_container(res['Id']) |
| assert 'Config' in inspect |
| assert 'Id' in inspect |
| assert inspect['Id'].startswith(res['Id']) |
| assert 'Image' in inspect |
| assert 'State' in inspect |
| assert 'Running' in inspect['State'] |
| if not inspect['State']['Running']: |
| assert 'ExitCode' in inspect['State'] |
| assert inspect['State']['ExitCode'] == 0 |
| |
| def test_run_shlex_commands(self): |
| commands = [ |
| 'true', |
| 'echo "The Young Descendant of Tepes & Septette for the ' |
| 'Dead Princess"', |
| 'echo -n "The Young Descendant of Tepes & Septette for the ' |
| 'Dead Princess"', |
| '/bin/sh -c "echo Hello World"', |
| '/bin/sh -c \'echo "Hello World"\'', |
| 'echo "\"Night of Nights\""', |
| 'true && echo "Night of Nights"' |
| ] |
| for cmd in commands: |
| container = self.client.create_container(TEST_IMG, cmd) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0, cmd |
| |
| |
| class WaitTest(BaseAPIIntegrationTest): |
| def test_wait(self): |
| res = self.client.create_container(TEST_IMG, ['sleep', '3']) |
| id = res['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| inspect = self.client.inspect_container(id) |
| assert 'Running' in inspect['State'] |
| assert inspect['State']['Running'] is False |
| assert 'ExitCode' in inspect['State'] |
| assert inspect['State']['ExitCode'] == exitcode |
| |
| def test_wait_with_dict_instead_of_id(self): |
| res = self.client.create_container(TEST_IMG, ['sleep', '3']) |
| id = res['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(res) |
| exitcode = self.client.wait(res)['StatusCode'] |
| assert exitcode == 0 |
| inspect = self.client.inspect_container(res) |
| assert 'Running' in inspect['State'] |
| assert inspect['State']['Running'] is False |
| assert 'ExitCode' in inspect['State'] |
| assert inspect['State']['ExitCode'] == exitcode |
| |
| @requires_api_version('1.30') |
| def test_wait_with_condition(self): |
| ctnr = self.client.create_container(TEST_IMG, 'true') |
| self.tmp_containers.append(ctnr) |
| with pytest.raises(requests.exceptions.ConnectionError): |
| self.client.wait(ctnr, condition='removed', timeout=1) |
| |
| ctnr = self.client.create_container( |
| TEST_IMG, ['sleep', '3'], |
| host_config=self.client.create_host_config(auto_remove=True) |
| ) |
| self.tmp_containers.append(ctnr) |
| self.client.start(ctnr) |
| assert self.client.wait( |
| ctnr, condition='removed', timeout=5 |
| )['StatusCode'] == 0 |
| |
| |
| class LogsTest(BaseAPIIntegrationTest): |
| def test_logs(self): |
| snippet = 'Flowering Nights (Sakuya Iyazoi)' |
| container = self.client.create_container( |
| TEST_IMG, f'echo {snippet}' |
| ) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| logs = self.client.logs(id) |
| assert logs == (snippet + '\n').encode(encoding='ascii') |
| |
| def test_logs_tail_option(self): |
| snippet = '''Line1 |
| Line2''' |
| container = self.client.create_container( |
| TEST_IMG, f'echo "{snippet}"' |
| ) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| logs = self.client.logs(id, tail=1) |
| assert logs == 'Line2\n'.encode(encoding='ascii') |
| |
| def test_logs_streaming_and_follow(self): |
| snippet = 'Flowering Nights (Sakuya Iyazoi)' |
| container = self.client.create_container( |
| TEST_IMG, f'echo {snippet}' |
| ) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| logs = b'' |
| for chunk in self.client.logs(id, stream=True, follow=True): |
| logs += chunk |
| |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| |
| assert logs == (snippet + '\n').encode(encoding='ascii') |
| |
| @pytest.mark.timeout(5) |
| @pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'), |
| reason='No cancellable streams over SSH') |
| def test_logs_streaming_and_follow_and_cancel(self): |
| snippet = 'Flowering Nights (Sakuya Iyazoi)' |
| container = self.client.create_container( |
| TEST_IMG, f'sh -c "echo \\"{snippet}\\" && sleep 3"' |
| ) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| logs = b'' |
| |
| generator = self.client.logs(id, stream=True, follow=True) |
| threading.Timer(1, generator.close).start() |
| |
| for chunk in generator: |
| logs += chunk |
| |
| assert logs == (snippet + '\n').encode(encoding='ascii') |
| |
| def test_logs_with_dict_instead_of_id(self): |
| snippet = 'Flowering Nights (Sakuya Iyazoi)' |
| container = self.client.create_container( |
| TEST_IMG, f'echo {snippet}' |
| ) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| logs = self.client.logs(container) |
| assert logs == (snippet + '\n').encode(encoding='ascii') |
| |
| def test_logs_with_tail_0(self): |
| snippet = 'Flowering Nights (Sakuya Iyazoi)' |
| container = self.client.create_container( |
| TEST_IMG, f'echo "{snippet}"' |
| ) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| logs = self.client.logs(id, tail=0) |
| assert logs == ''.encode(encoding='ascii') |
| |
| @requires_api_version('1.35') |
| def test_logs_with_until(self): |
| snippet = 'Shanghai Teahouse (Hong Meiling)' |
| container = self.client.create_container( |
| TEST_IMG, f'echo "{snippet}"' |
| ) |
| |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| exitcode = self.client.wait(container)['StatusCode'] |
| assert exitcode == 0 |
| logs_until_1 = self.client.logs(container, until=1) |
| assert logs_until_1 == b'' |
| logs_until_now = self.client.logs(container, datetime.now()) |
| assert logs_until_now == (snippet + '\n').encode(encoding='ascii') |
| |
| |
| class DiffTest(BaseAPIIntegrationTest): |
| def test_diff(self): |
| container = self.client.create_container(TEST_IMG, ['touch', '/test']) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| diff = self.client.diff(id) |
| test_diff = [x for x in diff if x.get('Path', None) == '/test'] |
| assert len(test_diff) == 1 |
| assert 'Kind' in test_diff[0] |
| assert test_diff[0]['Kind'] == 1 |
| |
| def test_diff_with_dict_instead_of_id(self): |
| container = self.client.create_container(TEST_IMG, ['touch', '/test']) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode == 0 |
| diff = self.client.diff(container) |
| test_diff = [x for x in diff if x.get('Path', None) == '/test'] |
| assert len(test_diff) == 1 |
| assert 'Kind' in test_diff[0] |
| assert test_diff[0]['Kind'] == 1 |
| |
| |
| class StopTest(BaseAPIIntegrationTest): |
| def test_stop(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| self.client.stop(id, timeout=2) |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'Running' in state |
| assert state['Running'] is False |
| |
| def test_stop_with_dict_instead_of_id(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| assert 'Id' in container |
| id = container['Id'] |
| self.client.start(container) |
| self.tmp_containers.append(id) |
| self.client.stop(container, timeout=2) |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'Running' in state |
| assert state['Running'] is False |
| |
| |
| class KillTest(BaseAPIIntegrationTest): |
| def test_kill(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| self.client.kill(id) |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'ExitCode' in state |
| assert state['ExitCode'] != 0 |
| assert 'Running' in state |
| assert state['Running'] is False |
| |
| def test_kill_with_dict_instead_of_id(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| self.client.kill(container) |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'ExitCode' in state |
| assert state['ExitCode'] != 0 |
| assert 'Running' in state |
| assert state['Running'] is False |
| |
| def test_kill_with_signal(self): |
| id = self.client.create_container(TEST_IMG, ['sleep', '60']) |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| self.client.kill( |
| id, signal=signal.SIGKILL if not IS_WINDOWS_PLATFORM else 9 |
| ) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode != 0 |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'ExitCode' in state |
| assert state['ExitCode'] != 0 |
| assert 'Running' in state |
| assert state['Running'] is False, state |
| |
| def test_kill_with_signal_name(self): |
| id = self.client.create_container(TEST_IMG, ['sleep', '60']) |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| self.client.kill(id, signal='SIGKILL') |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode != 0 |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'ExitCode' in state |
| assert state['ExitCode'] != 0 |
| assert 'Running' in state |
| assert state['Running'] is False, state |
| |
| def test_kill_with_signal_integer(self): |
| id = self.client.create_container(TEST_IMG, ['sleep', '60']) |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| self.client.kill(id, signal=9) |
| exitcode = self.client.wait(id)['StatusCode'] |
| assert exitcode != 0 |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'ExitCode' in state |
| assert state['ExitCode'] != 0 |
| assert 'Running' in state |
| assert state['Running'] is False, state |
| |
| |
| class PortTest(BaseAPIIntegrationTest): |
| def test_port(self): |
| port_bindings = { |
| '1111': ('127.0.0.1', '4567'), |
| '2222': ('127.0.0.1', '4568'), |
| '3333/udp': ('127.0.0.1', '4569'), |
| } |
| ports = [ |
| 1111, |
| 2222, |
| (3333, 'udp'), |
| ] |
| |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '60'], ports=ports, |
| host_config=self.client.create_host_config( |
| port_bindings=port_bindings, network_mode='bridge' |
| ) |
| ) |
| id = container['Id'] |
| |
| self.client.start(container) |
| |
| # Call the port function on each biding and compare expected vs actual |
| for port in port_bindings: |
| port, _, protocol = port.partition('/') |
| actual_bindings = self.client.port(container, port) |
| port_binding = actual_bindings.pop() |
| |
| ip, host_port = port_binding['HostIp'], port_binding['HostPort'] |
| |
| port_binding = port if not protocol else port + "/" + protocol |
| assert ip == port_bindings[port_binding][0] |
| assert host_port == port_bindings[port_binding][1] |
| |
| self.client.kill(id) |
| |
| |
| class ContainerTopTest(BaseAPIIntegrationTest): |
| @pytest.mark.xfail(reason='Output of docker top depends on host distro, ' |
| 'and is not formalized.') |
| def test_top(self): |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '60'] |
| ) |
| |
| self.tmp_containers.append(container) |
| |
| self.client.start(container) |
| res = self.client.top(container) |
| if not IS_WINDOWS_PLATFORM: |
| assert res['Titles'] == ['PID', 'USER', 'TIME', 'COMMAND'] |
| assert len(res['Processes']) == 1 |
| assert res['Processes'][0][-1] == 'sleep 60' |
| self.client.kill(container) |
| |
| @pytest.mark.skipif( |
| IS_WINDOWS_PLATFORM, reason='No psargs support on windows' |
| ) |
| @pytest.mark.xfail(reason='Output of docker top depends on host distro, ' |
| 'and is not formalized.') |
| def test_top_with_psargs(self): |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '60']) |
| |
| self.tmp_containers.append(container) |
| |
| self.client.start(container) |
| res = self.client.top(container, '-eopid,user') |
| assert res['Titles'] == ['PID', 'USER'] |
| assert len(res['Processes']) == 1 |
| assert res['Processes'][0][10] == 'sleep 60' |
| |
| |
| class RestartContainerTest(BaseAPIIntegrationTest): |
| def test_restart(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| info = self.client.inspect_container(id) |
| assert 'State' in info |
| assert 'StartedAt' in info['State'] |
| start_time1 = info['State']['StartedAt'] |
| self.client.restart(id, timeout=2) |
| info2 = self.client.inspect_container(id) |
| assert 'State' in info2 |
| assert 'StartedAt' in info2['State'] |
| start_time2 = info2['State']['StartedAt'] |
| assert start_time1 != start_time2 |
| assert 'Running' in info2['State'] |
| assert info2['State']['Running'] is True |
| self.client.kill(id) |
| |
| def test_restart_with_low_timeout(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| self.client.start(container) |
| self.client.timeout = 3 |
| self.client.restart(container, timeout=1) |
| self.client.timeout = None |
| self.client.restart(container, timeout=1) |
| self.client.kill(container) |
| |
| def test_restart_with_dict_instead_of_id(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| assert 'Id' in container |
| id = container['Id'] |
| self.client.start(container) |
| self.tmp_containers.append(id) |
| info = self.client.inspect_container(id) |
| assert 'State' in info |
| assert 'StartedAt' in info['State'] |
| start_time1 = info['State']['StartedAt'] |
| self.client.restart(container, timeout=2) |
| info2 = self.client.inspect_container(id) |
| assert 'State' in info2 |
| assert 'StartedAt' in info2['State'] |
| start_time2 = info2['State']['StartedAt'] |
| assert start_time1 != start_time2 |
| assert 'Running' in info2['State'] |
| assert info2['State']['Running'] is True |
| self.client.kill(id) |
| |
| |
| class RemoveContainerTest(BaseAPIIntegrationTest): |
| def test_remove(self): |
| container = self.client.create_container(TEST_IMG, ['true']) |
| id = container['Id'] |
| self.client.start(id) |
| self.client.wait(id) |
| self.client.remove_container(id) |
| containers = self.client.containers(all=True) |
| res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] |
| assert len(res) == 0 |
| |
| def test_remove_with_dict_instead_of_id(self): |
| container = self.client.create_container(TEST_IMG, ['true']) |
| id = container['Id'] |
| self.client.start(id) |
| self.client.wait(id) |
| self.client.remove_container(container) |
| containers = self.client.containers(all=True) |
| res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] |
| assert len(res) == 0 |
| |
| |
| class AttachContainerTest(BaseAPIIntegrationTest): |
| def test_run_container_streaming(self): |
| container = self.client.create_container(TEST_IMG, '/bin/sh', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| sock = self.client.attach_socket(container, ws=False) |
| assert sock.fileno() > -1 |
| |
| def test_run_container_reading_socket_http(self): |
| line = 'hi there and stuff and things, words!' |
| # `echo` appends CRLF, `printf` doesn't |
| command = f"printf '{line}'" |
| container = self.client.create_container(TEST_IMG, command, |
| detach=True, tty=False) |
| self.tmp_containers.append(container) |
| |
| opts = {"stdout": 1, "stream": 1, "logs": 1} |
| pty_stdout = self.client.attach_socket(container, opts) |
| self.addCleanup(pty_stdout.close) |
| |
| self.client.start(container) |
| |
| (stream, next_size) = next_frame_header(pty_stdout) |
| assert stream == 1 # correspond to stdout |
| assert next_size == len(line) |
| data = read_exactly(pty_stdout, next_size) |
| assert data.decode('utf-8') == line |
| |
| @pytest.mark.xfail(condition=bool(os.environ.get('DOCKER_CERT_PATH', '')), |
| reason='DOCKER_CERT_PATH not respected for websockets') |
| def test_run_container_reading_socket_ws(self): |
| line = 'hi there and stuff and things, words!' |
| # `echo` appends CRLF, `printf` doesn't |
| command = f"printf '{line}'" |
| container = self.client.create_container(TEST_IMG, command, |
| detach=True, tty=False) |
| self.tmp_containers.append(container) |
| |
| opts = {"stdout": 1, "stream": 1, "logs": 1} |
| pty_stdout = self.client.attach_socket(container, opts, ws=True) |
| self.addCleanup(pty_stdout.close) |
| |
| self.client.start(container) |
| |
| data = pty_stdout.recv() |
| assert data.decode('utf-8') == line |
| |
| @pytest.mark.timeout(10) |
| def test_attach_no_stream(self): |
| container = self.client.create_container( |
| TEST_IMG, 'echo hello' |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| self.client.wait(container, condition='not-running') |
| output = self.client.attach(container, stream=False, logs=True) |
| assert output == 'hello\n'.encode(encoding='ascii') |
| |
| @pytest.mark.timeout(10) |
| @pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'), |
| reason='No cancellable streams over SSH') |
| @pytest.mark.xfail(condition=os.environ.get('DOCKER_TLS_VERIFY') or |
| os.environ.get('DOCKER_CERT_PATH'), |
| reason='Flaky test on TLS') |
| def test_attach_stream_and_cancel(self): |
| container = self.client.create_container( |
| TEST_IMG, 'sh -c "sleep 2 && echo hello && sleep 60"', |
| tty=True |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| output = self.client.attach(container, stream=True, logs=True) |
| |
| threading.Timer(3, output.close).start() |
| |
| lines = [] |
| for line in output: |
| lines.append(line) |
| |
| assert len(lines) == 1 |
| assert lines[0] == 'hello\r\n'.encode(encoding='ascii') |
| |
| def test_detach_with_default(self): |
| container = self.client.create_container( |
| TEST_IMG, 'cat', |
| detach=True, stdin_open=True, tty=True |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| |
| sock = self.client.attach_socket( |
| container, |
| {'stdin': True, 'stream': True} |
| ) |
| |
| assert_cat_socket_detached_with_keys( |
| sock, [ctrl_with('p'), ctrl_with('q')] |
| ) |
| |
| def test_detach_with_config_file(self): |
| self.client._general_configs['detachKeys'] = 'ctrl-p' |
| |
| container = self.client.create_container( |
| TEST_IMG, 'cat', |
| detach=True, stdin_open=True, tty=True |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| |
| sock = self.client.attach_socket( |
| container, |
| {'stdin': True, 'stream': True} |
| ) |
| |
| assert_cat_socket_detached_with_keys(sock, [ctrl_with('p')]) |
| |
| def test_detach_with_arg(self): |
| self.client._general_configs['detachKeys'] = 'ctrl-p' |
| |
| container = self.client.create_container( |
| TEST_IMG, 'cat', |
| detach=True, stdin_open=True, tty=True |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| |
| sock = self.client.attach_socket( |
| container, |
| {'stdin': True, 'stream': True, 'detachKeys': 'ctrl-x'} |
| ) |
| |
| assert_cat_socket_detached_with_keys(sock, [ctrl_with('x')]) |
| |
| |
| class PauseTest(BaseAPIIntegrationTest): |
| def test_pause_unpause(self): |
| container = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(container) |
| self.client.pause(id) |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'ExitCode' in state |
| assert state['ExitCode'] == 0 |
| assert 'Running' in state |
| assert state['Running'] is True |
| assert 'Paused' in state |
| assert state['Paused'] is True |
| |
| self.client.unpause(id) |
| container_info = self.client.inspect_container(id) |
| assert 'State' in container_info |
| state = container_info['State'] |
| assert 'ExitCode' in state |
| assert state['ExitCode'] == 0 |
| assert 'Running' in state |
| assert state['Running'] is True |
| assert 'Paused' in state |
| assert state['Paused'] is False |
| |
| |
| class PruneTest(BaseAPIIntegrationTest): |
| @requires_api_version('1.25') |
| def test_prune_containers(self): |
| container1 = self.client.create_container( |
| TEST_IMG, ['sh', '-c', 'echo hello > /data.txt'] |
| ) |
| container2 = self.client.create_container(TEST_IMG, ['sleep', '9999']) |
| self.client.start(container1) |
| self.client.start(container2) |
| self.client.wait(container1) |
| result = self.client.prune_containers() |
| assert container1['Id'] in result['ContainersDeleted'] |
| assert result['SpaceReclaimed'] > 0 |
| assert container2['Id'] not in result['ContainersDeleted'] |
| |
| |
| class GetContainerStatsTest(BaseAPIIntegrationTest): |
| def test_get_container_stats_no_stream(self): |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '60'], |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| response = self.client.stats(container, stream=0) |
| self.client.kill(container) |
| |
| assert type(response) == dict |
| for key in ['read', 'networks', 'precpu_stats', 'cpu_stats', |
| 'memory_stats', 'blkio_stats']: |
| assert key in response |
| |
| def test_get_container_stats_stream(self): |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '60'], |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| stream = self.client.stats(container) |
| for chunk in stream: |
| assert type(chunk) == dict |
| for key in ['read', 'network', 'precpu_stats', 'cpu_stats', |
| 'memory_stats', 'blkio_stats']: |
| assert key in chunk |
| |
| |
| class ContainerUpdateTest(BaseAPIIntegrationTest): |
| @requires_api_version('1.22') |
| def test_update_container(self): |
| old_mem_limit = 400 * 1024 * 1024 |
| new_mem_limit = 300 * 1024 * 1024 |
| container = self.client.create_container( |
| TEST_IMG, 'top', host_config=self.client.create_host_config( |
| mem_limit=old_mem_limit |
| ) |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| self.client.update_container(container, mem_limit=new_mem_limit) |
| inspect_data = self.client.inspect_container(container) |
| assert inspect_data['HostConfig']['Memory'] == new_mem_limit |
| |
| @requires_api_version('1.23') |
| def test_restart_policy_update(self): |
| old_restart_policy = { |
| 'MaximumRetryCount': 0, |
| 'Name': 'always' |
| } |
| new_restart_policy = { |
| 'MaximumRetryCount': 42, |
| 'Name': 'on-failure' |
| } |
| container = self.client.create_container( |
| TEST_IMG, ['sleep', '60'], |
| host_config=self.client.create_host_config( |
| restart_policy=old_restart_policy |
| ) |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| self.client.update_container(container, |
| restart_policy=new_restart_policy) |
| inspect_data = self.client.inspect_container(container) |
| assert ( |
| inspect_data['HostConfig']['RestartPolicy']['MaximumRetryCount'] == |
| new_restart_policy['MaximumRetryCount'] |
| ) |
| assert ( |
| inspect_data['HostConfig']['RestartPolicy']['Name'] == |
| new_restart_policy['Name'] |
| ) |
| |
| |
| class ContainerCPUTest(BaseAPIIntegrationTest): |
| def test_container_cpu_shares(self): |
| cpu_shares = 512 |
| container = self.client.create_container( |
| TEST_IMG, 'ls', host_config=self.client.create_host_config( |
| cpu_shares=cpu_shares |
| ) |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| inspect_data = self.client.inspect_container(container) |
| assert inspect_data['HostConfig']['CpuShares'] == 512 |
| |
| def test_container_cpuset(self): |
| cpuset_cpus = "0,1" |
| container = self.client.create_container( |
| TEST_IMG, 'ls', host_config=self.client.create_host_config( |
| cpuset_cpus=cpuset_cpus |
| ) |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| inspect_data = self.client.inspect_container(container) |
| assert inspect_data['HostConfig']['CpusetCpus'] == cpuset_cpus |
| |
| @requires_api_version('1.25') |
| def test_create_with_runtime(self): |
| container = self.client.create_container( |
| TEST_IMG, ['echo', 'test'], runtime='runc' |
| ) |
| self.tmp_containers.append(container['Id']) |
| config = self.client.inspect_container(container) |
| assert config['HostConfig']['Runtime'] == 'runc' |
| |
| |
| class LinkTest(BaseAPIIntegrationTest): |
| def test_remove_link(self): |
| # Create containers |
| container1 = self.client.create_container( |
| TEST_IMG, 'cat', detach=True, stdin_open=True |
| ) |
| container1_id = container1['Id'] |
| self.tmp_containers.append(container1_id) |
| self.client.start(container1_id) |
| |
| # Create Link |
| # we don't want the first / |
| link_path = self.client.inspect_container(container1_id)['Name'][1:] |
| link_alias = 'mylink' |
| |
| container2 = self.client.create_container( |
| TEST_IMG, 'cat', host_config=self.client.create_host_config( |
| links={link_path: link_alias} |
| ) |
| ) |
| container2_id = container2['Id'] |
| self.tmp_containers.append(container2_id) |
| self.client.start(container2_id) |
| |
| # Remove link |
| linked_name = self.client.inspect_container(container2_id)['Name'][1:] |
| link_name = f'{linked_name}/{link_alias}' |
| self.client.remove_container(link_name, link=True) |
| |
| # Link is gone |
| containers = self.client.containers(all=True) |
| retrieved = [x for x in containers if link_name in x['Names']] |
| assert len(retrieved) == 0 |
| |
| # Containers are still there |
| retrieved = [ |
| x for x in containers if x['Id'].startswith(container1_id) or |
| x['Id'].startswith(container2_id) |
| ] |
| assert len(retrieved) == 2 |