| from ..helpers import assert_cat_socket_detached_with_keys |
| from ..helpers import ctrl_with |
| from ..helpers import requires_api_version |
| from .base import BaseAPIIntegrationTest |
| from .base import TEST_IMG |
| from docker.utils.proxy import ProxyConfig |
| from docker.utils.socket import next_frame_header |
| from docker.utils.socket import read_exactly |
| |
| |
| class ExecTest(BaseAPIIntegrationTest): |
| def test_execute_command_with_proxy_env(self): |
| # Set a custom proxy config on the client |
| self.client._proxy_configs = ProxyConfig( |
| ftp='a', https='b', http='c', no_proxy='d' |
| ) |
| |
| container = self.client.create_container( |
| TEST_IMG, 'cat', detach=True, stdin_open=True, |
| ) |
| self.client.start(container) |
| self.tmp_containers.append(container) |
| |
| cmd = 'sh -c "env | grep -i proxy"' |
| |
| # First, just make sure the environment variables from the custom |
| # config are set |
| |
| res = self.client.exec_create(container, cmd=cmd) |
| output = self.client.exec_start(res).decode('utf-8').split('\n') |
| expected = [ |
| 'ftp_proxy=a', 'https_proxy=b', 'http_proxy=c', 'no_proxy=d', |
| 'FTP_PROXY=a', 'HTTPS_PROXY=b', 'HTTP_PROXY=c', 'NO_PROXY=d' |
| ] |
| for item in expected: |
| assert item in output |
| |
| # Overwrite some variables with a custom environment |
| env = {'https_proxy': 'xxx', 'HTTPS_PROXY': 'XXX'} |
| |
| res = self.client.exec_create(container, cmd=cmd, environment=env) |
| output = self.client.exec_start(res).decode('utf-8').split('\n') |
| expected = [ |
| 'ftp_proxy=a', 'https_proxy=xxx', 'http_proxy=c', 'no_proxy=d', |
| 'FTP_PROXY=a', 'HTTPS_PROXY=XXX', 'HTTP_PROXY=c', 'NO_PROXY=d' |
| ] |
| for item in expected: |
| assert item in output |
| |
| def test_execute_command(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| res = self.client.exec_create(id, ['echo', 'hello']) |
| assert 'Id' in res |
| |
| exec_log = self.client.exec_start(res) |
| assert exec_log == b'hello\n' |
| |
| def test_exec_command_string(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| res = self.client.exec_create(id, 'echo hello world') |
| assert 'Id' in res |
| |
| exec_log = self.client.exec_start(res) |
| assert exec_log == b'hello world\n' |
| |
| def test_exec_command_as_user(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| res = self.client.exec_create(id, 'whoami', user='postgres') |
| assert 'Id' in res |
| |
| exec_log = self.client.exec_start(res) |
| assert exec_log == b'postgres\n' |
| |
| def test_exec_command_as_root(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| res = self.client.exec_create(id, 'whoami') |
| assert 'Id' in res |
| |
| exec_log = self.client.exec_start(res) |
| assert exec_log == b'root\n' |
| |
| def test_exec_command_streaming(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.tmp_containers.append(id) |
| self.client.start(id) |
| |
| exec_id = self.client.exec_create(id, ['echo', 'hello\nworld']) |
| assert 'Id' in exec_id |
| |
| res = b'' |
| for chunk in self.client.exec_start(exec_id, stream=True): |
| res += chunk |
| assert res == b'hello\nworld\n' |
| |
| def test_exec_start_socket(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| container_id = container['Id'] |
| self.client.start(container_id) |
| self.tmp_containers.append(container_id) |
| |
| line = 'yay, interactive exec!' |
| # `echo` appends CRLF, `printf` doesn't |
| exec_id = self.client.exec_create( |
| container_id, ['printf', line], tty=True) |
| assert 'Id' in exec_id |
| |
| socket = self.client.exec_start(exec_id, socket=True) |
| self.addCleanup(socket.close) |
| |
| (stream, next_size) = next_frame_header(socket) |
| assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr) |
| assert next_size == len(line) |
| data = read_exactly(socket, next_size) |
| assert data.decode('utf-8') == line |
| |
| def test_exec_start_detached(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| container_id = container['Id'] |
| self.client.start(container_id) |
| self.tmp_containers.append(container_id) |
| |
| exec_id = self.client.exec_create( |
| container_id, ['printf', "asdqwe"]) |
| assert 'Id' in exec_id |
| |
| response = self.client.exec_start(exec_id, detach=True) |
| |
| assert response == "" |
| |
| def test_exec_inspect(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist']) |
| assert 'Id' in exec_id |
| self.client.exec_start(exec_id) |
| exec_info = self.client.exec_inspect(exec_id) |
| assert 'ExitCode' in exec_info |
| assert exec_info['ExitCode'] != 0 |
| |
| @requires_api_version('1.25') |
| def test_exec_command_with_env(self): |
| container = self.client.create_container(TEST_IMG, 'cat', |
| detach=True, stdin_open=True) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| res = self.client.exec_create(id, 'env', environment=["X=Y"]) |
| assert 'Id' in res |
| |
| exec_log = self.client.exec_start(res) |
| assert b'X=Y\n' in exec_log |
| |
| @requires_api_version('1.35') |
| def test_exec_command_with_workdir(self): |
| container = self.client.create_container( |
| TEST_IMG, 'cat', detach=True, stdin_open=True |
| ) |
| self.tmp_containers.append(container) |
| self.client.start(container) |
| |
| res = self.client.exec_create(container, 'pwd', workdir='/var/opt') |
| exec_log = self.client.exec_start(res) |
| assert exec_log == b'/var/opt\n' |
| |
| def test_detach_with_default(self): |
| container = self.client.create_container( |
| TEST_IMG, 'cat', detach=True, stdin_open=True |
| ) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| exec_id = self.client.exec_create( |
| id, 'cat', stdin=True, tty=True, stdout=True |
| ) |
| sock = self.client.exec_start(exec_id, tty=True, socket=True) |
| self.addCleanup(sock.close) |
| |
| assert_cat_socket_detached_with_keys( |
| sock, [ctrl_with('p'), ctrl_with('q')] |
| ) |
| |
| def test_detach_with_config_file(self): |
| self.client._general_configs['detachKeys'] = 'ctrl-p' |
| container = self.client.create_container( |
| TEST_IMG, 'cat', detach=True, stdin_open=True |
| ) |
| id = container['Id'] |
| self.client.start(id) |
| self.tmp_containers.append(id) |
| |
| exec_id = self.client.exec_create( |
| id, 'cat', stdin=True, tty=True, stdout=True |
| ) |
| sock = self.client.exec_start(exec_id, tty=True, socket=True) |
| self.addCleanup(sock.close) |
| |
| assert_cat_socket_detached_with_keys(sock, [ctrl_with('p')]) |
| |
| |
| class ExecDemuxTest(BaseAPIIntegrationTest): |
| cmd = 'sh -c "{}"'.format(' ; '.join([ |
| # Write something on stdout |
| 'echo hello out', |
| # Busybox's sleep does not handle sub-second times. |
| # This loops takes ~0.3 second to execute on my machine. |
| 'sleep 0.5', |
| # Write something on stderr |
| 'echo hello err >&2']) |
| ) |
| |
| def setUp(self): |
| super(ExecDemuxTest, self).setUp() |
| self.container = self.client.create_container( |
| TEST_IMG, 'cat', detach=True, stdin_open=True |
| ) |
| self.client.start(self.container) |
| self.tmp_containers.append(self.container) |
| |
| def test_exec_command_no_stream_no_demux(self): |
| # tty=False, stream=False, demux=False |
| res = self.client.exec_create(self.container, self.cmd) |
| exec_log = self.client.exec_start(res) |
| assert b'hello out\n' in exec_log |
| assert b'hello err\n' in exec_log |
| |
| def test_exec_command_stream_no_demux(self): |
| # tty=False, stream=True, demux=False |
| res = self.client.exec_create(self.container, self.cmd) |
| exec_log = list(self.client.exec_start(res, stream=True)) |
| assert len(exec_log) == 2 |
| assert b'hello out\n' in exec_log |
| assert b'hello err\n' in exec_log |
| |
| def test_exec_command_no_stream_demux(self): |
| # tty=False, stream=False, demux=True |
| res = self.client.exec_create(self.container, self.cmd) |
| exec_log = self.client.exec_start(res, demux=True) |
| assert exec_log == (b'hello out\n', b'hello err\n') |
| |
| def test_exec_command_stream_demux(self): |
| # tty=False, stream=True, demux=True |
| res = self.client.exec_create(self.container, self.cmd) |
| exec_log = list(self.client.exec_start(res, demux=True, stream=True)) |
| assert len(exec_log) == 2 |
| assert (b'hello out\n', None) in exec_log |
| assert (None, b'hello err\n') in exec_log |
| |
| def test_exec_command_tty_no_stream_no_demux(self): |
| # tty=True, stream=False, demux=False |
| res = self.client.exec_create(self.container, self.cmd, tty=True) |
| exec_log = self.client.exec_start(res) |
| assert exec_log == b'hello out\r\nhello err\r\n' |
| |
| def test_exec_command_tty_stream_no_demux(self): |
| # tty=True, stream=True, demux=False |
| res = self.client.exec_create(self.container, self.cmd, tty=True) |
| exec_log = list(self.client.exec_start(res, stream=True)) |
| assert b'hello out\r\n' in exec_log |
| if len(exec_log) == 2: |
| assert b'hello err\r\n' in exec_log |
| else: |
| assert len(exec_log) == 3 |
| assert b'hello err' in exec_log |
| assert b'\r\n' in exec_log |
| |
| def test_exec_command_tty_no_stream_demux(self): |
| # tty=True, stream=False, demux=True |
| res = self.client.exec_create(self.container, self.cmd, tty=True) |
| exec_log = self.client.exec_start(res, demux=True) |
| assert exec_log == (b'hello out\r\nhello err\r\n', None) |
| |
| def test_exec_command_tty_stream_demux(self): |
| # tty=True, stream=True, demux=True |
| res = self.client.exec_create(self.container, self.cmd, tty=True) |
| exec_log = list(self.client.exec_start(res, demux=True, stream=True)) |
| assert (b'hello out\r\n', None) in exec_log |
| if len(exec_log) == 2: |
| assert (b'hello err\r\n', None) in exec_log |
| else: |
| assert len(exec_log) == 3 |
| assert (b'hello err', None) in exec_log |
| assert (b'\r\n', None) in exec_log |