blob: 0cb8fec68bf424c0af91e3171a8a05c83a4e1b15 [file] [log] [blame]
import os
import re
import signal
import tempfile
import threading
from datetime import datetime
import pytest
import requests
import docker
from .. import helpers
from ..helpers import assert_cat_socket_detached_with_keys
from ..helpers import ctrl_with
from ..helpers import requires_api_version
from .base import BaseAPIIntegrationTest
from .base import TEST_IMG
from docker.constants import IS_WINDOWS_PLATFORM
from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
class ListContainersTest(BaseAPIIntegrationTest):
def test_list_containers(self):
res0 = self.client.containers(all=True)
size = len(res0)
res1 = self.client.create_container(TEST_IMG, 'true')
assert 'Id' in res1
self.client.start(res1['Id'])
self.tmp_containers.append(res1['Id'])
res2 = self.client.containers(all=True)
assert size + 1 == len(res2)
retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
assert len(retrieved) == 1
retrieved = retrieved[0]
assert 'Command' in retrieved
assert retrieved['Command'] == 'true'
assert 'Image' in retrieved
assert re.search(r'alpine:.*', retrieved['Image'])
assert 'Status' in retrieved
class CreateContainerTest(BaseAPIIntegrationTest):
def test_create(self):
res = self.client.create_container(TEST_IMG, 'true')
assert 'Id' in res
self.tmp_containers.append(res['Id'])
def test_create_with_host_pid_mode(self):
ctnr = self.client.create_container(
TEST_IMG, 'true', host_config=self.client.create_host_config(
pid_mode='host', network_mode='none'
)
)
assert 'Id' in ctnr
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
inspect = self.client.inspect_container(ctnr)
assert 'HostConfig' in inspect
host_config = inspect['HostConfig']
assert 'PidMode' in host_config
assert host_config['PidMode'] == 'host'
def test_create_with_links(self):
res0 = self.client.create_container(
TEST_IMG, 'cat',
detach=True, stdin_open=True,
environment={'FOO': '1'})
container1_id = res0['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
res1 = self.client.create_container(
TEST_IMG, 'cat',
detach=True, stdin_open=True,
environment={'FOO': '1'})
container2_id = res1['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
# we don't want the first /
link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
link_alias1 = 'mylink1'
link_env_prefix1 = link_alias1.upper()
link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
link_alias2 = 'mylink2'
link_env_prefix2 = link_alias2.upper()
res2 = self.client.create_container(
TEST_IMG, 'env', host_config=self.client.create_host_config(
links={link_path1: link_alias1, link_path2: link_alias2},
network_mode='bridge'
)
)
container3_id = res2['Id']
self.tmp_containers.append(container3_id)
self.client.start(container3_id)
assert self.client.wait(container3_id)['StatusCode'] == 0
logs = self.client.logs(container3_id).decode('utf-8')
assert f'{link_env_prefix1}_NAME=' in logs
assert f'{link_env_prefix1}_ENV_FOO=1' in logs
assert f'{link_env_prefix2}_NAME=' in logs
assert f'{link_env_prefix2}_ENV_FOO=1' in logs
def test_create_with_restart_policy(self):
container = self.client.create_container(
TEST_IMG, ['sleep', '2'],
host_config=self.client.create_host_config(
restart_policy={"Name": "always", "MaximumRetryCount": 0},
network_mode='none'
)
)
id = container['Id']
self.client.start(id)
self.client.wait(id)
with pytest.raises(docker.errors.APIError) as exc:
self.client.remove_container(id)
err = exc.value.explanation
assert 'You cannot remove ' in err
self.client.remove_container(id, force=True)
def test_create_container_with_volumes_from(self):
vol_names = ['foobar_vol0', 'foobar_vol1']
res0 = self.client.create_container(
TEST_IMG, 'true', name=vol_names[0]
)
container1_id = res0['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
res1 = self.client.create_container(
TEST_IMG, 'true', name=vol_names[1]
)
container2_id = res1['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
res = self.client.create_container(
TEST_IMG, 'cat', detach=True, stdin_open=True,
host_config=self.client.create_host_config(
volumes_from=vol_names, network_mode='none'
)
)
container3_id = res['Id']
self.tmp_containers.append(container3_id)
self.client.start(container3_id)
info = self.client.inspect_container(res['Id'])
assert len(info['HostConfig']['VolumesFrom']) == len(vol_names)
def create_container_readonly_fs(self):
ctnr = self.client.create_container(
TEST_IMG, ['mkdir', '/shrine'],
host_config=self.client.create_host_config(
read_only=True, network_mode='none'
)
)
assert 'Id' in ctnr
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
res = self.client.wait(ctnr)['StatusCode']
assert res != 0
def create_container_with_name(self):
res = self.client.create_container(TEST_IMG, 'true', name='foobar')
assert 'Id' in res
self.tmp_containers.append(res['Id'])
inspect = self.client.inspect_container(res['Id'])
assert 'Name' in inspect
assert '/foobar' == inspect['Name']
def create_container_privileged(self):
res = self.client.create_container(
TEST_IMG, 'true', host_config=self.client.create_host_config(
privileged=True, network_mode='none'
)
)
assert 'Id' in res
self.tmp_containers.append(res['Id'])
self.client.start(res['Id'])
inspect = self.client.inspect_container(res['Id'])
assert 'Config' in inspect
assert 'Id' in inspect
assert inspect['Id'].startswith(res['Id'])
assert 'Image' in inspect
assert 'State' in inspect
assert 'Running' in inspect['State']
if not inspect['State']['Running']:
assert 'ExitCode' in inspect['State']
assert inspect['State']['ExitCode'] == 0
# Since Nov 2013, the Privileged flag is no longer part of the
# container's config exposed via the API (safety concerns?).
#
if 'Privileged' in inspect['Config']:
assert inspect['Config']['Privileged'] is True
def test_create_with_mac_address(self):
mac_address_expected = "02:42:ac:11:00:0a"
container = self.client.create_container(
TEST_IMG, ['sleep', '60'], mac_address=mac_address_expected)
id = container['Id']
self.client.start(container)
res = self.client.inspect_container(container['Id'])
assert mac_address_expected == res['NetworkSettings']['MacAddress']
self.client.kill(id)
@requires_api_version('1.41')
def test_create_with_cgroupns(self):
host_config = self.client.create_host_config(cgroupns='private')
container = self.client.create_container(
image=TEST_IMG,
command=['sleep', '60'],
host_config=host_config,
)
self.tmp_containers.append(container)
res = self.client.inspect_container(container)
assert 'private' == res['HostConfig']['CgroupnsMode']
def test_group_id_ints(self):
container = self.client.create_container(
TEST_IMG, 'id -G',
host_config=self.client.create_host_config(group_add=[1000, 1001])
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.wait(container)
logs = self.client.logs(container).decode('utf-8')
groups = logs.strip().split(' ')
assert '1000' in groups
assert '1001' in groups
def test_group_id_strings(self):
container = self.client.create_container(
TEST_IMG, 'id -G', host_config=self.client.create_host_config(
group_add=['1000', '1001']
)
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.wait(container)
logs = self.client.logs(container).decode('utf-8')
groups = logs.strip().split(' ')
assert '1000' in groups
assert '1001' in groups
def test_valid_log_driver_and_log_opt(self):
log_config = docker.types.LogConfig(
type='json-file',
config={'max-file': '100'}
)
container = self.client.create_container(
TEST_IMG, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
assert container_log_config['Type'] == log_config.type
assert container_log_config['Config'] == log_config.config
def test_invalid_log_driver_raises_exception(self):
log_config = docker.types.LogConfig(
type='asdf',
config={}
)
expected_msgs = [
"logger: no log driver named 'asdf' is registered",
"error looking up logging plugin asdf: plugin \"asdf\" not found",
]
with pytest.raises(docker.errors.APIError) as excinfo:
# raises an internal server error 500
container = self.client.create_container(
TEST_IMG, ['true'], host_config=self.client.create_host_config(
log_config=log_config
)
)
self.client.start(container)
assert excinfo.value.explanation in expected_msgs
def test_valid_no_log_driver_specified(self):
log_config = docker.types.LogConfig(
type="",
config={'max-file': '100'}
)
container = self.client.create_container(
TEST_IMG, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
assert container_log_config['Type'] == "json-file"
assert container_log_config['Config'] == log_config.config
def test_valid_no_config_specified(self):
log_config = docker.types.LogConfig(
type="json-file",
config=None
)
container = self.client.create_container(
TEST_IMG, ['true'],
host_config=self.client.create_host_config(log_config=log_config)
)
self.tmp_containers.append(container['Id'])
self.client.start(container)
info = self.client.inspect_container(container)
container_log_config = info['HostConfig']['LogConfig']
assert container_log_config['Type'] == "json-file"
assert container_log_config['Config'] == {}
def test_create_with_memory_constraints_with_str(self):
ctnr = self.client.create_container(
TEST_IMG, 'true',
host_config=self.client.create_host_config(
memswap_limit='1G',
mem_limit='700M'
)
)
assert 'Id' in ctnr
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
inspect = self.client.inspect_container(ctnr)
assert 'HostConfig' in inspect
host_config = inspect['HostConfig']
for limit in ['Memory', 'MemorySwap']:
assert limit in host_config
def test_create_with_memory_constraints_with_int(self):
ctnr = self.client.create_container(
TEST_IMG, 'true',
host_config=self.client.create_host_config(mem_swappiness=40)
)
assert 'Id' in ctnr
self.tmp_containers.append(ctnr['Id'])
self.client.start(ctnr)
inspect = self.client.inspect_container(ctnr)
assert 'HostConfig' in inspect
host_config = inspect['HostConfig']
assert 'MemorySwappiness' in host_config
def test_create_with_environment_variable_no_value(self):
container = self.client.create_container(
TEST_IMG,
['echo'],
environment={'Foo': None, 'Other': 'one', 'Blank': ''},
)
self.tmp_containers.append(container['Id'])
config = self.client.inspect_container(container['Id'])
assert 'Foo' in config['Config']['Env']
assert 'Other=one' in config['Config']['Env']
assert 'Blank=' in config['Config']['Env']
@requires_api_version('1.22')
def test_create_with_tmpfs(self):
tmpfs = {
'/tmp1': 'size=3M'
}
container = self.client.create_container(
TEST_IMG,
['echo'],
host_config=self.client.create_host_config(
tmpfs=tmpfs))
self.tmp_containers.append(container['Id'])
config = self.client.inspect_container(container)
assert config['HostConfig']['Tmpfs'] == tmpfs
@requires_api_version('1.24')
def test_create_with_isolation(self):
container = self.client.create_container(
TEST_IMG, ['echo'], host_config=self.client.create_host_config(
isolation='default'
)
)
self.tmp_containers.append(container['Id'])
config = self.client.inspect_container(container)
assert config['HostConfig']['Isolation'] == 'default'
@requires_api_version('1.25')
def test_create_with_auto_remove(self):
host_config = self.client.create_host_config(
auto_remove=True
)
container = self.client.create_container(
TEST_IMG, ['echo', 'test'], host_config=host_config
)
self.tmp_containers.append(container['Id'])
config = self.client.inspect_container(container)
assert config['HostConfig']['AutoRemove'] is True
@requires_api_version('1.25')
def test_create_with_stop_timeout(self):
container = self.client.create_container(
TEST_IMG, ['echo', 'test'], stop_timeout=25
)
self.tmp_containers.append(container['Id'])
config = self.client.inspect_container(container)
assert config['Config']['StopTimeout'] == 25
@requires_api_version('1.24')
@pytest.mark.xfail(True, reason='Not supported on most drivers')
def test_create_with_storage_opt(self):
host_config = self.client.create_host_config(
storage_opt={'size': '120G'}
)
container = self.client.create_container(
TEST_IMG, ['echo', 'test'], host_config=host_config
)
self.tmp_containers.append(container)
config = self.client.inspect_container(container)
assert config['HostConfig']['StorageOpt'] == {
'size': '120G'
}
@requires_api_version('1.25')
def test_create_with_init(self):
ctnr = self.client.create_container(
TEST_IMG, 'true',
host_config=self.client.create_host_config(
init=True
)
)
self.tmp_containers.append(ctnr['Id'])
config = self.client.inspect_container(ctnr)
assert config['HostConfig']['Init'] is True
@requires_api_version('1.24')
@pytest.mark.xfail(not os.path.exists('/sys/fs/cgroup/cpu.rt_runtime_us'),
reason='CONFIG_RT_GROUP_SCHED isn\'t enabled')
def test_create_with_cpu_rt_options(self):
ctnr = self.client.create_container(
TEST_IMG, 'true', host_config=self.client.create_host_config(
cpu_rt_period=1000, cpu_rt_runtime=500
)
)
self.tmp_containers.append(ctnr)
config = self.client.inspect_container(ctnr)
assert config['HostConfig']['CpuRealtimeRuntime'] == 500
assert config['HostConfig']['CpuRealtimePeriod'] == 1000
@requires_api_version('1.28')
def test_create_with_device_cgroup_rules(self):
rule = 'c 7:128 rwm'
ctnr = self.client.create_container(
TEST_IMG, 'true', host_config=self.client.create_host_config(
device_cgroup_rules=[rule]
)
)
self.tmp_containers.append(ctnr)
config = self.client.inspect_container(ctnr)
assert config['HostConfig']['DeviceCgroupRules'] == [rule]
def test_create_with_uts_mode(self):
container = self.client.create_container(
TEST_IMG, ['echo'], host_config=self.client.create_host_config(
uts_mode='host'
)
)
self.tmp_containers.append(container)
config = self.client.inspect_container(container)
assert config['HostConfig']['UTSMode'] == 'host'
@pytest.mark.xfail(
IS_WINDOWS_PLATFORM, reason='Test not designed for Windows platform'
)
class VolumeBindTest(BaseAPIIntegrationTest):
def setUp(self):
super().setUp()
self.mount_dest = '/mnt'
# Get a random pathname - we don't need it to exist locally
self.mount_origin = tempfile.mkdtemp()
self.filename = 'shared.txt'
self.run_with_volume(
False,
TEST_IMG,
['touch', os.path.join(self.mount_dest, self.filename)],
)
def test_create_with_binds_rw(self):
container = self.run_with_volume(
False,
TEST_IMG,
['ls', self.mount_dest],
)
logs = self.client.logs(container).decode('utf-8')
assert self.filename in logs
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, True)
def test_create_with_binds_ro(self):
self.run_with_volume(
False,
TEST_IMG,
['touch', os.path.join(self.mount_dest, self.filename)],
)
container = self.run_with_volume(
True,
TEST_IMG,
['ls', self.mount_dest],
)
logs = self.client.logs(container).decode('utf-8')
assert self.filename in logs
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, False)
@requires_api_version('1.30')
def test_create_with_mounts(self):
mount = docker.types.Mount(
type="bind", source=self.mount_origin, target=self.mount_dest
)
host_config = self.client.create_host_config(mounts=[mount])
container = self.run_container(
TEST_IMG, ['ls', self.mount_dest],
host_config=host_config
)
assert container
logs = self.client.logs(container).decode('utf-8')
assert self.filename in logs
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, True)
@requires_api_version('1.30')
def test_create_with_mounts_ro(self):
mount = docker.types.Mount(
type="bind", source=self.mount_origin, target=self.mount_dest,
read_only=True
)
host_config = self.client.create_host_config(mounts=[mount])
container = self.run_container(
TEST_IMG, ['ls', self.mount_dest],
host_config=host_config
)
assert container
logs = self.client.logs(container).decode('utf-8')
assert self.filename in logs
inspect_data = self.client.inspect_container(container)
self.check_container_data(inspect_data, False)
@requires_api_version('1.30')
def test_create_with_volume_mount(self):
mount = docker.types.Mount(
type="volume", source=helpers.random_name(),
target=self.mount_dest, labels={'com.dockerpy.test': 'true'}
)
host_config = self.client.create_host_config(mounts=[mount])
container = self.client.create_container(
TEST_IMG, ['true'], host_config=host_config,
)
assert container
inspect_data = self.client.inspect_container(container)
assert 'Mounts' in inspect_data
filtered = list(filter(
lambda x: x['Destination'] == self.mount_dest,
inspect_data['Mounts']
))
assert len(filtered) == 1
mount_data = filtered[0]
assert mount['Source'] == mount_data['Name']
assert mount_data['RW'] is True
def check_container_data(self, inspect_data, rw):
assert 'Mounts' in inspect_data
filtered = list(filter(
lambda x: x['Destination'] == self.mount_dest,
inspect_data['Mounts']
))
assert len(filtered) == 1
mount_data = filtered[0]
assert mount_data['Source'] == self.mount_origin
assert mount_data['RW'] == rw
def run_with_volume(self, ro, *args, **kwargs):
return self.run_container(
*args,
volumes={self.mount_dest: {}},
host_config=self.client.create_host_config(
binds={
self.mount_origin: {
'bind': self.mount_dest,
'ro': ro,
},
},
network_mode='none'
),
**kwargs
)
class ArchiveTest(BaseAPIIntegrationTest):
def test_get_file_archive_from_container(self):
data = 'The Maid and the Pocket Watch of Blood'
ctnr = self.client.create_container(
TEST_IMG, f'sh -c "echo {data} > /vol1/data.txt"',
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.client.wait(ctnr)
with tempfile.NamedTemporaryFile() as destination:
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
for d in strm:
destination.write(d)
destination.seek(0)
retrieved_data = helpers.untar_file(destination, 'data.txt')\
.decode('utf-8')
assert data == retrieved_data.strip()
def test_get_file_stat_from_container(self):
data = 'The Maid and the Pocket Watch of Blood'
ctnr = self.client.create_container(
TEST_IMG, f'sh -c "echo -n {data} > /vol1/data.txt"',
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
self.client.wait(ctnr)
strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
assert 'name' in stat
assert stat['name'] == 'data.txt'
assert 'size' in stat
assert stat['size'] == len(data)
def test_copy_file_to_container(self):
data = b'Deaf To All But The Song'
with tempfile.NamedTemporaryFile(delete=False) as test_file:
test_file.write(data)
test_file.seek(0)
ctnr = self.client.create_container(
TEST_IMG,
'cat {}'.format(
os.path.join('/vol1/', os.path.basename(test_file.name))
),
volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
with helpers.simple_tar(test_file.name) as test_tar:
self.client.put_archive(ctnr, '/vol1', test_tar)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr)
assert logs.strip() == data
def test_copy_directory_to_container(self):
files = ['a.py', 'b.py', 'foo/b.py']
dirs = ['foo', 'bar']
base = helpers.make_tree(dirs, files)
ctnr = self.client.create_container(
TEST_IMG, 'ls -p /vol1', volumes=['/vol1']
)
self.tmp_containers.append(ctnr)
with docker.utils.tar(base) as test_tar:
self.client.put_archive(ctnr, '/vol1', test_tar)
self.client.start(ctnr)
self.client.wait(ctnr)
logs = self.client.logs(ctnr).decode('utf-8')
results = logs.strip().split()
assert 'a.py' in results
assert 'b.py' in results
assert 'foo/' in results
assert 'bar/' in results
class RenameContainerTest(BaseAPIIntegrationTest):
def test_rename_container(self):
version = self.client.version()['Version']
name = 'hong_meiling'
res = self.client.create_container(TEST_IMG, 'true')
assert 'Id' in res
self.tmp_containers.append(res['Id'])
self.client.rename(res, name)
inspect = self.client.inspect_container(res['Id'])
assert 'Name' in inspect
if version == '1.5.0':
assert name == inspect['Name']
else:
assert f'/{name}' == inspect['Name']
class StartContainerTest(BaseAPIIntegrationTest):
def test_start_container(self):
res = self.client.create_container(TEST_IMG, 'true')
assert 'Id' in res
self.tmp_containers.append(res['Id'])
self.client.start(res['Id'])
inspect = self.client.inspect_container(res['Id'])
assert 'Config' in inspect
assert 'Id' in inspect
assert inspect['Id'].startswith(res['Id'])
assert 'Image' in inspect
assert 'State' in inspect
assert 'Running' in inspect['State']
if not inspect['State']['Running']:
assert 'ExitCode' in inspect['State']
assert inspect['State']['ExitCode'] == 0
def test_start_container_with_dict_instead_of_id(self):
res = self.client.create_container(TEST_IMG, 'true')
assert 'Id' in res
self.tmp_containers.append(res['Id'])
self.client.start(res)
inspect = self.client.inspect_container(res['Id'])
assert 'Config' in inspect
assert 'Id' in inspect
assert inspect['Id'].startswith(res['Id'])
assert 'Image' in inspect
assert 'State' in inspect
assert 'Running' in inspect['State']
if not inspect['State']['Running']:
assert 'ExitCode' in inspect['State']
assert inspect['State']['ExitCode'] == 0
def test_run_shlex_commands(self):
commands = [
'true',
'echo "The Young Descendant of Tepes & Septette for the '
'Dead Princess"',
'echo -n "The Young Descendant of Tepes & Septette for the '
'Dead Princess"',
'/bin/sh -c "echo Hello World"',
'/bin/sh -c \'echo "Hello World"\'',
'echo "\"Night of Nights\""',
'true && echo "Night of Nights"'
]
for cmd in commands:
container = self.client.create_container(TEST_IMG, cmd)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0, cmd
class WaitTest(BaseAPIIntegrationTest):
def test_wait(self):
res = self.client.create_container(TEST_IMG, ['sleep', '3'])
id = res['Id']
self.tmp_containers.append(id)
self.client.start(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
inspect = self.client.inspect_container(id)
assert 'Running' in inspect['State']
assert inspect['State']['Running'] is False
assert 'ExitCode' in inspect['State']
assert inspect['State']['ExitCode'] == exitcode
def test_wait_with_dict_instead_of_id(self):
res = self.client.create_container(TEST_IMG, ['sleep', '3'])
id = res['Id']
self.tmp_containers.append(id)
self.client.start(res)
exitcode = self.client.wait(res)['StatusCode']
assert exitcode == 0
inspect = self.client.inspect_container(res)
assert 'Running' in inspect['State']
assert inspect['State']['Running'] is False
assert 'ExitCode' in inspect['State']
assert inspect['State']['ExitCode'] == exitcode
@requires_api_version('1.30')
def test_wait_with_condition(self):
ctnr = self.client.create_container(TEST_IMG, 'true')
self.tmp_containers.append(ctnr)
with pytest.raises(requests.exceptions.ConnectionError):
self.client.wait(ctnr, condition='removed', timeout=1)
ctnr = self.client.create_container(
TEST_IMG, ['sleep', '3'],
host_config=self.client.create_host_config(auto_remove=True)
)
self.tmp_containers.append(ctnr)
self.client.start(ctnr)
assert self.client.wait(
ctnr, condition='removed', timeout=5
)['StatusCode'] == 0
class LogsTest(BaseAPIIntegrationTest):
def test_logs(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
TEST_IMG, f'echo {snippet}'
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
logs = self.client.logs(id)
assert logs == (snippet + '\n').encode(encoding='ascii')
def test_logs_tail_option(self):
snippet = '''Line1
Line2'''
container = self.client.create_container(
TEST_IMG, f'echo "{snippet}"'
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
logs = self.client.logs(id, tail=1)
assert logs == 'Line2\n'.encode(encoding='ascii')
def test_logs_streaming_and_follow(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
TEST_IMG, f'echo {snippet}'
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
logs = b''
for chunk in self.client.logs(id, stream=True, follow=True):
logs += chunk
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
assert logs == (snippet + '\n').encode(encoding='ascii')
@pytest.mark.timeout(5)
@pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'),
reason='No cancellable streams over SSH')
def test_logs_streaming_and_follow_and_cancel(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
TEST_IMG, f'sh -c "echo \\"{snippet}\\" && sleep 3"'
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
logs = b''
generator = self.client.logs(id, stream=True, follow=True)
threading.Timer(1, generator.close).start()
for chunk in generator:
logs += chunk
assert logs == (snippet + '\n').encode(encoding='ascii')
def test_logs_with_dict_instead_of_id(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
TEST_IMG, f'echo {snippet}'
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
logs = self.client.logs(container)
assert logs == (snippet + '\n').encode(encoding='ascii')
def test_logs_with_tail_0(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
TEST_IMG, f'echo "{snippet}"'
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
logs = self.client.logs(id, tail=0)
assert logs == ''.encode(encoding='ascii')
@requires_api_version('1.35')
def test_logs_with_until(self):
snippet = 'Shanghai Teahouse (Hong Meiling)'
container = self.client.create_container(
TEST_IMG, f'echo "{snippet}"'
)
self.tmp_containers.append(container)
self.client.start(container)
exitcode = self.client.wait(container)['StatusCode']
assert exitcode == 0
logs_until_1 = self.client.logs(container, until=1)
assert logs_until_1 == b''
logs_until_now = self.client.logs(container, datetime.now())
assert logs_until_now == (snippet + '\n').encode(encoding='ascii')
class DiffTest(BaseAPIIntegrationTest):
def test_diff(self):
container = self.client.create_container(TEST_IMG, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
diff = self.client.diff(id)
test_diff = [x for x in diff if x.get('Path', None) == '/test']
assert len(test_diff) == 1
assert 'Kind' in test_diff[0]
assert test_diff[0]['Kind'] == 1
def test_diff_with_dict_instead_of_id(self):
container = self.client.create_container(TEST_IMG, ['touch', '/test'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode == 0
diff = self.client.diff(container)
test_diff = [x for x in diff if x.get('Path', None) == '/test']
assert len(test_diff) == 1
assert 'Kind' in test_diff[0]
assert test_diff[0]['Kind'] == 1
class StopTest(BaseAPIIntegrationTest):
def test_stop(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.stop(id, timeout=2)
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'Running' in state
assert state['Running'] is False
def test_stop_with_dict_instead_of_id(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
assert 'Id' in container
id = container['Id']
self.client.start(container)
self.tmp_containers.append(id)
self.client.stop(container, timeout=2)
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'Running' in state
assert state['Running'] is False
class KillTest(BaseAPIIntegrationTest):
def test_kill(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(id)
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'ExitCode' in state
assert state['ExitCode'] != 0
assert 'Running' in state
assert state['Running'] is False
def test_kill_with_dict_instead_of_id(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(container)
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'ExitCode' in state
assert state['ExitCode'] != 0
assert 'Running' in state
assert state['Running'] is False
def test_kill_with_signal(self):
id = self.client.create_container(TEST_IMG, ['sleep', '60'])
self.tmp_containers.append(id)
self.client.start(id)
self.client.kill(
id, signal=signal.SIGKILL if not IS_WINDOWS_PLATFORM else 9
)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode != 0
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'ExitCode' in state
assert state['ExitCode'] != 0
assert 'Running' in state
assert state['Running'] is False, state
def test_kill_with_signal_name(self):
id = self.client.create_container(TEST_IMG, ['sleep', '60'])
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(id, signal='SIGKILL')
exitcode = self.client.wait(id)['StatusCode']
assert exitcode != 0
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'ExitCode' in state
assert state['ExitCode'] != 0
assert 'Running' in state
assert state['Running'] is False, state
def test_kill_with_signal_integer(self):
id = self.client.create_container(TEST_IMG, ['sleep', '60'])
self.client.start(id)
self.tmp_containers.append(id)
self.client.kill(id, signal=9)
exitcode = self.client.wait(id)['StatusCode']
assert exitcode != 0
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'ExitCode' in state
assert state['ExitCode'] != 0
assert 'Running' in state
assert state['Running'] is False, state
class PortTest(BaseAPIIntegrationTest):
def test_port(self):
port_bindings = {
'1111': ('127.0.0.1', '4567'),
'2222': ('127.0.0.1', '4568'),
'3333/udp': ('127.0.0.1', '4569'),
}
ports = [
1111,
2222,
(3333, 'udp'),
]
container = self.client.create_container(
TEST_IMG, ['sleep', '60'], ports=ports,
host_config=self.client.create_host_config(
port_bindings=port_bindings, network_mode='bridge'
)
)
id = container['Id']
self.client.start(container)
# Call the port function on each biding and compare expected vs actual
for port in port_bindings:
port, _, protocol = port.partition('/')
actual_bindings = self.client.port(container, port)
port_binding = actual_bindings.pop()
ip, host_port = port_binding['HostIp'], port_binding['HostPort']
port_binding = port if not protocol else port + "/" + protocol
assert ip == port_bindings[port_binding][0]
assert host_port == port_bindings[port_binding][1]
self.client.kill(id)
class ContainerTopTest(BaseAPIIntegrationTest):
@pytest.mark.xfail(reason='Output of docker top depends on host distro, '
'and is not formalized.')
def test_top(self):
container = self.client.create_container(
TEST_IMG, ['sleep', '60']
)
self.tmp_containers.append(container)
self.client.start(container)
res = self.client.top(container)
if not IS_WINDOWS_PLATFORM:
assert res['Titles'] == ['PID', 'USER', 'TIME', 'COMMAND']
assert len(res['Processes']) == 1
assert res['Processes'][0][-1] == 'sleep 60'
self.client.kill(container)
@pytest.mark.skipif(
IS_WINDOWS_PLATFORM, reason='No psargs support on windows'
)
@pytest.mark.xfail(reason='Output of docker top depends on host distro, '
'and is not formalized.')
def test_top_with_psargs(self):
container = self.client.create_container(
TEST_IMG, ['sleep', '60'])
self.tmp_containers.append(container)
self.client.start(container)
res = self.client.top(container, '-eopid,user')
assert res['Titles'] == ['PID', 'USER']
assert len(res['Processes']) == 1
assert res['Processes'][0][10] == 'sleep 60'
class RestartContainerTest(BaseAPIIntegrationTest):
def test_restart(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
info = self.client.inspect_container(id)
assert 'State' in info
assert 'StartedAt' in info['State']
start_time1 = info['State']['StartedAt']
self.client.restart(id, timeout=2)
info2 = self.client.inspect_container(id)
assert 'State' in info2
assert 'StartedAt' in info2['State']
start_time2 = info2['State']['StartedAt']
assert start_time1 != start_time2
assert 'Running' in info2['State']
assert info2['State']['Running'] is True
self.client.kill(id)
def test_restart_with_low_timeout(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
self.client.start(container)
self.client.timeout = 3
self.client.restart(container, timeout=1)
self.client.timeout = None
self.client.restart(container, timeout=1)
self.client.kill(container)
def test_restart_with_dict_instead_of_id(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
assert 'Id' in container
id = container['Id']
self.client.start(container)
self.tmp_containers.append(id)
info = self.client.inspect_container(id)
assert 'State' in info
assert 'StartedAt' in info['State']
start_time1 = info['State']['StartedAt']
self.client.restart(container, timeout=2)
info2 = self.client.inspect_container(id)
assert 'State' in info2
assert 'StartedAt' in info2['State']
start_time2 = info2['State']['StartedAt']
assert start_time1 != start_time2
assert 'Running' in info2['State']
assert info2['State']['Running'] is True
self.client.kill(id)
class RemoveContainerTest(BaseAPIIntegrationTest):
def test_remove(self):
container = self.client.create_container(TEST_IMG, ['true'])
id = container['Id']
self.client.start(id)
self.client.wait(id)
self.client.remove_container(id)
containers = self.client.containers(all=True)
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
assert len(res) == 0
def test_remove_with_dict_instead_of_id(self):
container = self.client.create_container(TEST_IMG, ['true'])
id = container['Id']
self.client.start(id)
self.client.wait(id)
self.client.remove_container(container)
containers = self.client.containers(all=True)
res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
assert len(res) == 0
class AttachContainerTest(BaseAPIIntegrationTest):
def test_run_container_streaming(self):
container = self.client.create_container(TEST_IMG, '/bin/sh',
detach=True, stdin_open=True)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
sock = self.client.attach_socket(container, ws=False)
assert sock.fileno() > -1
def test_run_container_reading_socket_http(self):
line = 'hi there and stuff and things, words!'
# `echo` appends CRLF, `printf` doesn't
command = f"printf '{line}'"
container = self.client.create_container(TEST_IMG, command,
detach=True, tty=False)
self.tmp_containers.append(container)
opts = {"stdout": 1, "stream": 1, "logs": 1}
pty_stdout = self.client.attach_socket(container, opts)
self.addCleanup(pty_stdout.close)
self.client.start(container)
(stream, next_size) = next_frame_header(pty_stdout)
assert stream == 1 # correspond to stdout
assert next_size == len(line)
data = read_exactly(pty_stdout, next_size)
assert data.decode('utf-8') == line
@pytest.mark.xfail(condition=bool(os.environ.get('DOCKER_CERT_PATH', '')),
reason='DOCKER_CERT_PATH not respected for websockets')
def test_run_container_reading_socket_ws(self):
line = 'hi there and stuff and things, words!'
# `echo` appends CRLF, `printf` doesn't
command = f"printf '{line}'"
container = self.client.create_container(TEST_IMG, command,
detach=True, tty=False)
self.tmp_containers.append(container)
opts = {"stdout": 1, "stream": 1, "logs": 1}
pty_stdout = self.client.attach_socket(container, opts, ws=True)
self.addCleanup(pty_stdout.close)
self.client.start(container)
data = pty_stdout.recv()
assert data.decode('utf-8') == line
@pytest.mark.timeout(10)
def test_attach_no_stream(self):
container = self.client.create_container(
TEST_IMG, 'echo hello'
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.wait(container, condition='not-running')
output = self.client.attach(container, stream=False, logs=True)
assert output == 'hello\n'.encode(encoding='ascii')
@pytest.mark.timeout(10)
@pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'),
reason='No cancellable streams over SSH')
@pytest.mark.xfail(condition=os.environ.get('DOCKER_TLS_VERIFY') or
os.environ.get('DOCKER_CERT_PATH'),
reason='Flaky test on TLS')
def test_attach_stream_and_cancel(self):
container = self.client.create_container(
TEST_IMG, 'sh -c "sleep 2 && echo hello && sleep 60"',
tty=True
)
self.tmp_containers.append(container)
self.client.start(container)
output = self.client.attach(container, stream=True, logs=True)
threading.Timer(3, output.close).start()
lines = []
for line in output:
lines.append(line)
assert len(lines) == 1
assert lines[0] == 'hello\r\n'.encode(encoding='ascii')
def test_detach_with_default(self):
container = self.client.create_container(
TEST_IMG, 'cat',
detach=True, stdin_open=True, tty=True
)
self.tmp_containers.append(container)
self.client.start(container)
sock = self.client.attach_socket(
container,
{'stdin': True, 'stream': True}
)
assert_cat_socket_detached_with_keys(
sock, [ctrl_with('p'), ctrl_with('q')]
)
def test_detach_with_config_file(self):
self.client._general_configs['detachKeys'] = 'ctrl-p'
container = self.client.create_container(
TEST_IMG, 'cat',
detach=True, stdin_open=True, tty=True
)
self.tmp_containers.append(container)
self.client.start(container)
sock = self.client.attach_socket(
container,
{'stdin': True, 'stream': True}
)
assert_cat_socket_detached_with_keys(sock, [ctrl_with('p')])
def test_detach_with_arg(self):
self.client._general_configs['detachKeys'] = 'ctrl-p'
container = self.client.create_container(
TEST_IMG, 'cat',
detach=True, stdin_open=True, tty=True
)
self.tmp_containers.append(container)
self.client.start(container)
sock = self.client.attach_socket(
container,
{'stdin': True, 'stream': True, 'detachKeys': 'ctrl-x'}
)
assert_cat_socket_detached_with_keys(sock, [ctrl_with('x')])
class PauseTest(BaseAPIIntegrationTest):
def test_pause_unpause(self):
container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
id = container['Id']
self.tmp_containers.append(id)
self.client.start(container)
self.client.pause(id)
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'ExitCode' in state
assert state['ExitCode'] == 0
assert 'Running' in state
assert state['Running'] is True
assert 'Paused' in state
assert state['Paused'] is True
self.client.unpause(id)
container_info = self.client.inspect_container(id)
assert 'State' in container_info
state = container_info['State']
assert 'ExitCode' in state
assert state['ExitCode'] == 0
assert 'Running' in state
assert state['Running'] is True
assert 'Paused' in state
assert state['Paused'] is False
class PruneTest(BaseAPIIntegrationTest):
@requires_api_version('1.25')
def test_prune_containers(self):
container1 = self.client.create_container(
TEST_IMG, ['sh', '-c', 'echo hello > /data.txt']
)
container2 = self.client.create_container(TEST_IMG, ['sleep', '9999'])
self.client.start(container1)
self.client.start(container2)
self.client.wait(container1)
result = self.client.prune_containers()
assert container1['Id'] in result['ContainersDeleted']
assert result['SpaceReclaimed'] > 0
assert container2['Id'] not in result['ContainersDeleted']
class GetContainerStatsTest(BaseAPIIntegrationTest):
def test_get_container_stats_no_stream(self):
container = self.client.create_container(
TEST_IMG, ['sleep', '60'],
)
self.tmp_containers.append(container)
self.client.start(container)
response = self.client.stats(container, stream=0)
self.client.kill(container)
assert type(response) == dict
for key in ['read', 'networks', 'precpu_stats', 'cpu_stats',
'memory_stats', 'blkio_stats']:
assert key in response
def test_get_container_stats_stream(self):
container = self.client.create_container(
TEST_IMG, ['sleep', '60'],
)
self.tmp_containers.append(container)
self.client.start(container)
stream = self.client.stats(container)
for chunk in stream:
assert type(chunk) == dict
for key in ['read', 'network', 'precpu_stats', 'cpu_stats',
'memory_stats', 'blkio_stats']:
assert key in chunk
class ContainerUpdateTest(BaseAPIIntegrationTest):
@requires_api_version('1.22')
def test_update_container(self):
old_mem_limit = 400 * 1024 * 1024
new_mem_limit = 300 * 1024 * 1024
container = self.client.create_container(
TEST_IMG, 'top', host_config=self.client.create_host_config(
mem_limit=old_mem_limit
)
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.update_container(container, mem_limit=new_mem_limit)
inspect_data = self.client.inspect_container(container)
assert inspect_data['HostConfig']['Memory'] == new_mem_limit
@requires_api_version('1.23')
def test_restart_policy_update(self):
old_restart_policy = {
'MaximumRetryCount': 0,
'Name': 'always'
}
new_restart_policy = {
'MaximumRetryCount': 42,
'Name': 'on-failure'
}
container = self.client.create_container(
TEST_IMG, ['sleep', '60'],
host_config=self.client.create_host_config(
restart_policy=old_restart_policy
)
)
self.tmp_containers.append(container)
self.client.start(container)
self.client.update_container(container,
restart_policy=new_restart_policy)
inspect_data = self.client.inspect_container(container)
assert (
inspect_data['HostConfig']['RestartPolicy']['MaximumRetryCount'] ==
new_restart_policy['MaximumRetryCount']
)
assert (
inspect_data['HostConfig']['RestartPolicy']['Name'] ==
new_restart_policy['Name']
)
class ContainerCPUTest(BaseAPIIntegrationTest):
def test_container_cpu_shares(self):
cpu_shares = 512
container = self.client.create_container(
TEST_IMG, 'ls', host_config=self.client.create_host_config(
cpu_shares=cpu_shares
)
)
self.tmp_containers.append(container)
self.client.start(container)
inspect_data = self.client.inspect_container(container)
assert inspect_data['HostConfig']['CpuShares'] == 512
def test_container_cpuset(self):
cpuset_cpus = "0,1"
container = self.client.create_container(
TEST_IMG, 'ls', host_config=self.client.create_host_config(
cpuset_cpus=cpuset_cpus
)
)
self.tmp_containers.append(container)
self.client.start(container)
inspect_data = self.client.inspect_container(container)
assert inspect_data['HostConfig']['CpusetCpus'] == cpuset_cpus
@requires_api_version('1.25')
def test_create_with_runtime(self):
container = self.client.create_container(
TEST_IMG, ['echo', 'test'], runtime='runc'
)
self.tmp_containers.append(container['Id'])
config = self.client.inspect_container(container)
assert config['HostConfig']['Runtime'] == 'runc'
class LinkTest(BaseAPIIntegrationTest):
def test_remove_link(self):
# Create containers
container1 = self.client.create_container(
TEST_IMG, 'cat', detach=True, stdin_open=True
)
container1_id = container1['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
# Create Link
# we don't want the first /
link_path = self.client.inspect_container(container1_id)['Name'][1:]
link_alias = 'mylink'
container2 = self.client.create_container(
TEST_IMG, 'cat', host_config=self.client.create_host_config(
links={link_path: link_alias}
)
)
container2_id = container2['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
# Remove link
linked_name = self.client.inspect_container(container2_id)['Name'][1:]
link_name = f'{linked_name}/{link_alias}'
self.client.remove_container(link_name, link=True)
# Link is gone
containers = self.client.containers(all=True)
retrieved = [x for x in containers if link_name in x['Names']]
assert len(retrieved) == 0
# Containers are still there
retrieved = [
x for x in containers if x['Id'].startswith(container1_id) or
x['Id'].startswith(container2_id)
]
assert len(retrieved) == 2