blob: 5b642cdaccb6c0a13db5409a86cd54b15d1774b4 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Start and stop tsproxy."""
from __future__ import absolute_import
import locale
import logging
import os
import re
import signal
import subprocess
import sys
import time
import six
try:
import fcntl
except ImportError:
fcntl = None
import py_utils
from py_utils import retry_util
from py_utils import atexit_with_log
_TSPROXY_PATH = os.path.join(
py_utils.GetCatapultDir(), 'third_party', 'tsproxy', 'tsproxy.py')
class TsProxyServerError(Exception):
"""Catch-all exception for tsProxy Server."""
def ParseTsProxyPortFromOutput(output_line):
port_re = re.compile(
r'Started Socks5 proxy server on '
r'(?P<host>[^:]*):'
r'(?P<port>\d+)')
m = port_re.match(output_line)
if m:
return int(m.group('port'))
return None
class TsProxyServer():
"""Start and stop tsproxy.
TsProxy provides basic latency, download and upload traffic shaping. This
class provides a programming API to the tsproxy script in
catapult/third_party/tsproxy/tsproxy.py
This class can be used as a context manager.
"""
def __init__(self, host_ip=None, http_port=None, https_port=None):
"""
Initialize TsProxyServer.
Args:
host_ip: A string of the host ip address.
http_port: A decimal of the port used for http traffic.
https_port: a decimal of the port used for https traffic.
"""
self._proc = None
self._port = None
self._is_running = False
self._host_ip = host_ip
assert bool(http_port) == bool(https_port)
self._http_port = http_port
self._https_port = https_port
self._non_blocking = False
self._rtt = None
self._inbkps = None
self._outkbps = None
@property
def port(self):
return self._port
@retry_util.RetryOnException(TsProxyServerError, retries=3)
def StartServer(self, timeout=10, retries=None):
"""Start TsProxy server and verify that it started."""
del retries # Handled by decorator.
cmd_line = [sys.executable, _TSPROXY_PATH]
# Use port 0 so tsproxy picks a random available port.
cmd_line.extend(['--port=0'])
if self._host_ip:
cmd_line.append('--desthost=%s' % self._host_ip)
if self._http_port:
cmd_line.append(
'--mapports=443:%s,*:%s' % (self._https_port, self._http_port))
logging.info('Tsproxy commandline: %s', cmd_line)
# In python3 subprocess handles encoding/decoding; this warns if it won't
# be UTF-8.
if locale.getpreferredencoding() != 'UTF-8':
logging.warning('Decoding will use %s instead of UTF-8',
locale.getpreferredencoding())
# In python3 universal_newlines forces subprocess to encode/decode,
# allowing per-line buffering.
self._proc = subprocess.Popen(
cmd_line, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=1, universal_newlines=True)
self._non_blocking = False
if fcntl:
logging.info('fcntl is supported, trying to set '
'non blocking I/O for the ts_proxy process')
fd = self._proc.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self._non_blocking = True
atexit_with_log.Register(self.StopServer)
try:
py_utils.WaitFor(self._IsStarted, timeout)
logging.info('TsProxy port: %s', self._port)
self._is_running = True
except py_utils.TimeoutException:
err = self.StopServer()
if err:
logging.error('Error stopping WPR server:\n%s', err)
six.raise_from(TsProxyServerError(
'Error starting tsproxy: timed out after %s seconds' % timeout), None)
def _IsStarted(self):
assert not self._is_running
assert self._proc
if self._proc.poll() is not None:
return False
self._proc.stdout.flush()
output_line = self._ReadLineTsProxyStdout(timeout=5)
logging.debug('TsProxy output: %s', output_line)
self._port = ParseTsProxyPortFromOutput(output_line)
return self._port is not None
def _ReadLineTsProxyStdout(self, timeout):
def ReadSingleLine():
try:
return self._proc.stdout.readline().strip()
except IOError:
# Add a sleep to avoid trying to read self._proc.stdout too often.
if self._non_blocking:
time.sleep(0.5)
return None
return py_utils.WaitFor(ReadSingleLine, timeout)
@retry_util.RetryOnException(TsProxyServerError, retries=3)
def _IssueCommand(self, command_string, timeout, retries=None):
del retries # handled by the decorator
logging.info('Issuing command to ts_proxy_server: %s', command_string)
command_output = []
self._proc.stdin.write(('%s\n' % command_string))
def CommandStatusIsRead():
self._proc.stdin.flush()
self._proc.stdout.flush()
command_output.append(self._ReadLineTsProxyStdout(timeout))
return command_output[-1] == 'OK' or command_output[-1] == 'ERROR'
py_utils.WaitFor(CommandStatusIsRead, timeout)
success = 'OK' in command_output
logging.log(logging.DEBUG if success else logging.ERROR,
'TsProxy output:\n%s', '\n'.join(command_output))
if not success:
six.raise_from(TsProxyServerError('Failed to execute command: %s',
command_string), None)
def UpdateOutboundPorts(self, http_port, https_port, timeout=5):
assert http_port and https_port
assert http_port != https_port
assert isinstance(http_port, int) and isinstance(https_port, int)
assert 1 <= http_port <= 65535
assert 1 <= https_port <= 65535
self._IssueCommand('set mapports 443:%i,*:%i' % (https_port, http_port),
timeout)
def UpdateTrafficSettings(
self, round_trip_latency_ms=None,
download_bandwidth_kbps=None, upload_bandwidth_kbps=None, timeout=20):
"""Update traffic settings of the proxy server.
Notes that this method only updates the specified parameter.
"""
# Memorize the traffic settings & only execute the command if the traffic
# settings are different.
if round_trip_latency_ms is not None and self._rtt != round_trip_latency_ms:
self._IssueCommand('set rtt %s' % round_trip_latency_ms, timeout)
self._rtt = round_trip_latency_ms
if (download_bandwidth_kbps is not None and
self._inbkps != download_bandwidth_kbps):
self._IssueCommand('set inkbps %s' % download_bandwidth_kbps, timeout)
self._inbkps = download_bandwidth_kbps
if (upload_bandwidth_kbps is not None and
self._outkbps != upload_bandwidth_kbps):
self._IssueCommand('set outkbps %s' % upload_bandwidth_kbps, timeout)
self._outkbps = upload_bandwidth_kbps
def StopServer(self):
"""Stop TsProxy Server."""
if not self._is_running:
logging.debug('Attempting to stop TsProxy server that is not running.')
return None
if not self._proc:
return None
try:
self._IssueCommand('exit', timeout=10)
py_utils.WaitFor(lambda: self._proc.poll() is not None, 10)
except py_utils.TimeoutException:
# signal.SIGINT is not supported on Windows.
if not sys.platform.startswith('win'):
try:
# Use a SIGNINT so that it can do graceful cleanup
self._proc.send_signal(signal.SIGINT)
except ValueError:
logging.warning('Unable to stop ts_proxy_server gracefully.\n')
self._proc.terminate()
_, err = self._proc.communicate()
self._proc = None
self._port = None
self._is_running = False
self._rtt = None
self._inbkps = None
self._outkbps = None
return err
def __enter__(self):
"""Add support for with-statement."""
self.StartServer()
return self
def __exit__(self, unused_exc_type, unused_exc_val, unused_exc_tb):
"""Add support for with-statement."""
self.StopServer()