#!/usr/bin/env python
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.

import datetime
import json
import logging
import os
import StringIO
import sys
import tempfile
import threading
import time
import traceback

# Mutates sys.path.
import test_env

# third_party/
from depot_tools import auto_stub

import isolateserver_fake
import net_utils
import swarmingserver_fake

import auth
import isolateserver
import local_caching
import swarming
from utils import file_path
from utils import logging_utils
from utils import subprocess42
from utils import tools


FILE_HASH = u'1' * 40
TEST_NAME = u'unit_tests'

OUTPUT = 'Ran stuff\n'

SHARD_OUTPUT_1 = 'Shard 1 of 3.'
SHARD_OUTPUT_2 = 'Shard 2 of 3.'
SHARD_OUTPUT_3 = 'Shard 3 of 3.'


def gen_yielded_data(index, **kwargs):
  """Returns an entry as it would be yielded by yield_results()."""
  return index, gen_result_response(**kwargs)


def get_results(keys, output_collector=None):
  """Simplifies the call to yield_results().

  The timeout is hard-coded to 10 seconds.
  """
  return list(
      swarming.yield_results(
          'https://host:9001', keys, 10., None, True,
          output_collector, False, True))


def collect(url, task_ids, task_stdout=('console', 'json')):
  """Simplifies the call to swarming.collect()."""
  return swarming.collect(
    swarming=url,
    task_ids=task_ids,
    timeout=10,
    decorate=True,
    print_status_updates=True,
    task_summary_json=None,
    task_output_dir=None,
    task_output_stdout=task_stdout,
    include_perf=False,
    filepath_filter='.*')


def main(args):
  """Bypasses swarming.main()'s exception handling.

  It gets in the way when debugging test failures.
  """
  dispatcher = swarming.subcommand.CommandDispatcher('swarming')
  return dispatcher.execute(swarming.OptionParserSwarming(), args)


def gen_properties(**kwargs):
  out = {
    'caches': [],
    'cipd_input': None,
    'command': None,
    'relative_cwd': None,
    'dimensions': [
      {'key': 'os', 'value': 'Mac'},
      {'key': 'pool', 'value': 'default'},
    ],
    'env': [],
    'env_prefixes': [],
    'execution_timeout_secs': 60,
    'extra_args': ['--some-arg', '123'],
    'grace_period_secs': 30,
    'idempotent': False,
    'inputs_ref': {
      'isolated': None,
      'isolatedserver': '',
      'namespace': 'default-gzip',
    },
    'io_timeout_secs': 60,
    'outputs': [],
    'secret_bytes': None,
  }
  out.update(kwargs)
  return out


def gen_request_data(properties=None, **kwargs):
  out = {
    'name': 'unit_tests',
    'parent_task_id': '',
    'pool_task_template': 'AUTO',
    'priority': 101,
    'task_slices': [
      {
        'expiration_secs': 3600,
        'properties': gen_properties(**(properties or {})),
        'wait_for_capacity': False,
      },
    ],
    'tags': ['tag:a', 'tag:b'],
    'user': 'joe@localhost',
  }
  out.update(kwargs)
  return out


def gen_request_response(request, **kwargs):
  # As seen in services/swarming/handlers_api.py.
  out = {
    'request': request.copy(),
    'task_id': '12300',
  }
  out.update(kwargs)
  return out


def gen_result_response(**kwargs):
  out = {
    u'bot_id': u'swarm6',
    u'completed_ts': u'2014-09-24T13:49:16.012345',
    u'created_ts': u'2014-09-24T13:49:03.012345',
    u'duration': 0.9636809825897217,
    u'exit_code': 0,
    u'failure': False,
    u'internal_failure': False,
    u'modified_ts': u'2014-09-24T13:49:17.012345',
    u'name': u'heartbeat-canary-2014-09-24_13:49:01-os=Ubuntu',
    u'server_versions': [u'1'],
    u'started_ts': u'2014-09-24T13:49:09.012345',
    u'state': 'COMPLETED',
    u'tags': [u'cpu:x86', u'priority:200', u'user:joe@localhost'],
    u'task_id': u'10100',
    u'try_number': 1,
    u'user': u'joe@localhost',
  }
  out.update(kwargs)
  return out


class NonBlockingEvent(threading._Event):
  """Just like threading.Event, but a class and ignores timeout in 'wait'.

  Intended to be used as a mock for threading.Event in tests.
  """

  def wait(self, timeout=None):
    return super(NonBlockingEvent, self).wait(0)


class Common(object):
  # pylint: disable=no-member

  def setUp(self):
    self._tempdir = None
    self.mock(auth, 'ensure_logged_in', lambda _: None)
    self.mock(sys, 'stdout', StringIO.StringIO())
    self.mock(sys, 'stderr', StringIO.StringIO())
    self.mock(logging_utils, 'prepare_logging', lambda *args: None)
    self.mock(logging_utils, 'set_console_level', lambda *args: None)

  def tearDown(self):
    if self._tempdir:
      file_path.rmtree(self._tempdir)
    if not self.has_failed():
      self._check_output('', '')

  @property
  def tempdir(self):
    """Creates the directory on first reference."""
    if not self._tempdir:
      self._tempdir = tempfile.mkdtemp(prefix=u'swarming_test')
    return self._tempdir

  maxDiff = None
  def _check_output(self, out, err):
    self.assertMultiLineEqual(out, sys.stdout.getvalue())
    self.assertMultiLineEqual(err, sys.stderr.getvalue())

    # Flush their content by mocking them again.
    self.mock(sys, 'stdout', StringIO.StringIO())
    self.mock(sys, 'stderr', StringIO.StringIO())

  def main_safe(self, args):
    """Bypasses swarming.main()'s exception handling.

    It gets in the way when debugging test failures.
    """
    # pylint: disable=bare-except
    try:
      return main(args)
    except:
      data = '%s\nSTDOUT:\n%s\nSTDERR:\n%s' % (
          traceback.format_exc(), sys.stdout.getvalue(), sys.stderr.getvalue())
      self.fail(data)


class NetTestCase(net_utils.TestCase, Common):
  """Base class that defines the url_open mock."""
  def setUp(self):
    net_utils.TestCase.setUp(self)
    Common.setUp(self)
    self.mock(time, 'sleep', lambda _: None)
    self.mock(subprocess42, 'call', lambda *_: self.fail())
    self.mock(threading, 'Event', NonBlockingEvent)


class TestIsolated(auto_stub.TestCase, Common):
  """Test functions with isolated_ prefix."""
  def setUp(self):
    auto_stub.TestCase.setUp(self)
    Common.setUp(self)
    self._isolate = isolateserver_fake.FakeIsolateServer()
    self._swarming = swarmingserver_fake.FakeSwarmingServer()

  def tearDown(self):
    try:
      self._isolate.close()
      self._swarming.close()
    finally:
      Common.tearDown(self)
      auto_stub.TestCase.tearDown(self)

  def test_reproduce_isolated(self):
    old_cwd = os.getcwd()
    try:
      os.chdir(self.tempdir)

      def call(cmd, env, cwd):
        # 'out' is the default value for --output-dir.
        outdir = os.path.join(self.tempdir, 'out')
        self.assertTrue(os.path.isdir(outdir))
        if sys.platform == 'darwin':
          # On macOS, python executable's path is a symlink and it's hard to
          # assess what will be passed to the command line. :/
          self.assertEqual(
              [u'main.py', u'foo', os.path.realpath(outdir), '--bar'], cmd[1:])
        else:
          self.assertEqual(
              [sys.executable, u'main.py', u'foo', os.path.realpath(outdir),
                '--bar'],
              cmd)
        expected = os.environ.copy()
        expected['SWARMING_TASK_ID'] = 'reproduce'
        expected['SWARMING_BOT_ID'] = 'reproduce'
        self.assertEqual(expected, env)
        self.assertEqual(unicode(os.path.abspath('work')), cwd)
        return 0

      self.mock(subprocess42, 'call', call)

      main_hash = self._isolate.add_content_compressed(
          'default-gzip', 'not executed')
      isolated = {
        'files': {
          'main.py': {
            'h': main_hash,
            's': 12,
            'm': 0700,
          },
        },
        'command': ['python', 'main.py'],
      }
      isolated_hash = self._isolate.add_content_compressed(
          'default-gzip', json.dumps(isolated))
      self._swarming._server.tasks[123] = {
        'properties': {
          'inputs_ref': {
            'isolatedserver': self._isolate.url,
            'namespace': 'default-gzip',
            'isolated': isolated_hash,
          },
          'extra_args': ['foo', '${ISOLATED_OUTDIR}'],
          'secret_bytes': None,
        },
      }
      ret = self.main_safe(
          [
            'reproduce', '--swarming', self._swarming.url, '123', '--',
            '--bar',
          ])
      self._check_output('', '')
      self.assertEqual(0, ret)
    finally:
      os.chdir(old_cwd)


class TestSwarmingTrigger(NetTestCase):
  def test_trigger_task_shards_2_shards(self):
    task_request = swarming.NewTaskRequest(
        name=TEST_NAME,
        parent_task_id=None,
        pool_task_template='AUTO',
        priority=101,
        task_slices=[
          swarming.TaskSlice(
              expiration_secs=60*60,
              properties=swarming.TaskProperties(
                  caches=[],
                  cipd_input=None,
                  command=['a', 'b'],
                  relative_cwd=None,
                  dimensions=[('os', 'Mac'), ('pool', 'default')],
                  env={},
                  env_prefixes=[],
                  execution_timeout_secs=60,
                  extra_args=[],
                  grace_period_secs=30,
                  idempotent=False,
                  inputs_ref={
                    'isolated': None,
                    'isolatedserver': '',
                    'namespace': 'default-gzip',
                  },
                  io_timeout_secs=60,
                  outputs=[],
                  secret_bytes=None),
              wait_for_capacity=False),
        ],
        service_account=None,
        tags=['tag:a', 'tag:b'],
        user='joe@localhost')

    request_1 = swarming.task_request_to_raw_request(task_request)
    request_1['name'] = u'unit_tests:0:2'
    request_1['task_slices'][0]['properties']['env'] = [
      {'key': 'GTEST_SHARD_INDEX', 'value': '0'},
      {'key': 'GTEST_TOTAL_SHARDS', 'value': '2'},
    ]
    result_1 = gen_request_response(request_1)

    request_2 = swarming.task_request_to_raw_request(task_request)
    request_2['name'] = u'unit_tests:1:2'
    request_2['task_slices'][0]['properties']['env'] = [
      {'key': 'GTEST_SHARD_INDEX', 'value': '1'},
      {'key': 'GTEST_TOTAL_SHARDS', 'value': '2'},
    ]
    result_2 = gen_request_response(request_2, task_id='12400')
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request_1},
            result_1,
          ),
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request_2},
            result_2,
          ),
        ])

    tasks = swarming.trigger_task_shards(
        swarming='https://localhost:1',
        task_request=task_request,
        shards=2)
    expected = {
      u'unit_tests:0:2': {
        'shard_index': 0,
        'task_id': '12300',
        'view_url': 'https://localhost:1/user/task/12300',
      },
      u'unit_tests:1:2': {
        'shard_index': 1,
        'task_id': '12400',
        'view_url': 'https://localhost:1/user/task/12400',
      },
    }
    self.assertEqual(expected, tasks)

  def test_trigger_task_shard_custom_index(self):
    task_request = swarming.NewTaskRequest(
        name=TEST_NAME,
        parent_task_id=None,
        pool_task_template='AUTO',
        priority=101,
        task_slices=[
          swarming.TaskSlice(
              expiration_secs=60*60,
              properties=swarming.TaskProperties(
                  caches=[],
                  cipd_input=None,
                  command=['a', 'b'],
                  relative_cwd=None,
                  dimensions=[('os', 'Mac'), ('pool', 'default')],
                  env={'GTEST_SHARD_INDEX' : '2', 'GTEST_TOTAL_SHARDS' : '4'},
                  env_prefixes=[],
                  execution_timeout_secs=60,
                  extra_args=[],
                  grace_period_secs=30,
                  idempotent=False,
                  inputs_ref={
                    'isolated': None,
                    'isolatedserver': '',
                    'namespace': 'default-gzip',
                  },
                  io_timeout_secs=60,
                  outputs=[],
                  secret_bytes=None),
              wait_for_capacity=False),
        ],
        service_account=None,
        tags=['tag:a', 'tag:b'],
        user='joe@localhost')

    request_1 = swarming.task_request_to_raw_request(task_request)
    request_1['name'] = u'unit_tests:2:4'
    request_1['task_slices'][0]['properties']['env'] = [
      {'key': 'GTEST_SHARD_INDEX', 'value': '2'},
      {'key': 'GTEST_TOTAL_SHARDS', 'value': '4'},
    ]
    result_1 = gen_request_response(request_1)

    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request_1},
            result_1,
          ),
        ])

    tasks = swarming.trigger_task_shards(
        swarming='https://localhost:1',
        task_request=task_request,
        shards=1)
    expected = {
      u'unit_tests:2:4': {
        'shard_index': 2,
        'task_id': '12300',
        'view_url': 'https://localhost:1/user/task/12300',
      },
    }
    self.assertEqual(expected, tasks)

  def test_trigger_task_shards_priority_override(self):
    task_request = swarming.NewTaskRequest(
        name=TEST_NAME,
        parent_task_id='123',
        pool_task_template='AUTO',
        priority=101,
        task_slices=[
          swarming.TaskSlice(
              expiration_secs=60*60,
              properties=swarming.TaskProperties(
                  caches=[],
                  cipd_input=None,
                  command=['a', 'b'],
                  relative_cwd=None,
                  dimensions=[('os', 'Mac'), ('pool', 'default')],
                  env={},
                  env_prefixes=[],
                  execution_timeout_secs=60,
                  extra_args=[],
                  grace_period_secs=30,
                  idempotent=False,
                  inputs_ref={
                    'isolated': None,
                    'isolatedserver': '',
                    'namespace': 'default-gzip',
                  },
                  io_timeout_secs=60,
                  outputs=[],
                  secret_bytes=None),
              wait_for_capacity=False),
        ],
        service_account=None,
        tags=['tag:a', 'tag:b'],
        user='joe@localhost')

    request = swarming.task_request_to_raw_request(task_request)
    self.assertEqual('123', request['parent_task_id'])

    result = gen_request_response(request)
    result['request']['priority'] = 200
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])

    os.environ['SWARMING_TASK_ID'] = '123'
    try:
      tasks = swarming.trigger_task_shards(
          swarming='https://localhost:1',
          shards=1,
          task_request=task_request)
    finally:
      os.environ.pop('SWARMING_TASK_ID')
    expected = {
      u'unit_tests': {
        'shard_index': 0,
        'task_id': '12300',
        'view_url': 'https://localhost:1/user/task/12300',
      }
    }
    self.assertEqual(expected, tasks)
    self._check_output('', 'Priority was reset to 200\n')

  def test_trigger_cipd_package(self):
    task_request = swarming.NewTaskRequest(
        name=TEST_NAME,
        parent_task_id='123',
        pool_task_template='AUTO',
        priority=101,
        task_slices=[
          swarming.TaskSlice(
              expiration_secs=60*60,
              properties=swarming.TaskProperties(
                  caches=[],
                  cipd_input=swarming.CipdInput(
                      client_package=None,
                      packages=[
                          swarming.CipdPackage(
                              package_name='mypackage',
                              path='path/to/package',
                              version='abc123')],
                      server=None),
                  command=['a', 'b'],
                  relative_cwd=None,
                  dimensions=[('os', 'Mac'), ('pool', 'default')],
                  env={},
                  env_prefixes=[],
                  execution_timeout_secs=60,
                  extra_args=[],
                  grace_period_secs=30,
                  idempotent=False,
                  inputs_ref={
                    'isolated': None,
                    'isolatedserver': '',
                    'namespace': 'default-gzip',
                  },
                  io_timeout_secs=60,
                  outputs=[],
                  secret_bytes=None),
              wait_for_capacity=False),
        ],
        service_account=None,
        tags=['tag:a', 'tag:b'],
        user='joe@localhost')

    request = swarming.task_request_to_raw_request(task_request)
    expected = {
      'client_package': None,
      'packages': [{
          'package_name': 'mypackage',
          'path': 'path/to/package',
          'version': 'abc123',
      }],
      'server': None
    }
    self.assertEqual(
        expected, request['task_slices'][0]['properties']['cipd_input'])

    result = gen_request_response(request)
    result['request']['priority'] = 200
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])

    os.environ['SWARMING_TASK_ID'] = '123'
    try:
      tasks = swarming.trigger_task_shards(
          swarming='https://localhost:1',
          shards=1,
          task_request=task_request)
    finally:
      os.environ.pop('SWARMING_TASK_ID')
    expected = {
      u'unit_tests': {
        'shard_index': 0,
        'task_id': '12300',
        'view_url': 'https://localhost:1/user/task/12300',
      }
    }
    self.assertEqual(expected, tasks)
    self._check_output('', 'Priority was reset to 200\n')


class TestSwarmingCollection(NetTestCase):
  def test_success(self):
    self.expected_requests(
        [
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/result',
            {'retry_50x': False},
            gen_result_response(),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/stdout',
            {},
            {'output': OUTPUT},
          ),
        ])
    expected = [gen_yielded_data(0, output=OUTPUT)]
    self.assertEqual(expected, get_results(['10100']))

  def test_failure(self):
    self.expected_requests(
        [
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/result',
            {'retry_50x': False},
            gen_result_response(exit_code=1),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/stdout',
            {},
            {'output': OUTPUT},
          ),
        ])
    expected = [gen_yielded_data(0, output=OUTPUT, exit_code=1)]
    self.assertEqual(expected, get_results(['10100']))

  def test_no_ids(self):
    actual = get_results([])
    self.assertEqual([], actual)

  def test_url_errors(self):
    self.mock(logging, 'error', lambda *_, **__: None)
    # NOTE: get_results() hardcodes timeout=10.
    now = {}
    lock = threading.Lock()
    def get_now():
      t = threading.current_thread()
      with lock:
        return now.setdefault(t, range(10)).pop(0)
    self.mock(swarming.net, 'sleep_before_retry', lambda _x, _y: None)
    self.mock(swarming, 'now', get_now)
    # The actual number of requests here depends on 'now' progressing to 10
    # seconds. It's called once per loop. Loop makes 9 iterations.
    self.expected_requests(
        9 * [
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/result',
            {'retry_50x': False},
            None,
          )
        ])
    actual = get_results(['10100'])
    self.assertEqual([], actual)
    self.assertTrue(all(not v for v in now.itervalues()), now)

  def test_many_shards(self):
    self.expected_requests(
        [
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/result',
            {'retry_50x': False},
            gen_result_response(),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/stdout',
            {},
            {'output': SHARD_OUTPUT_1},
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10200/result',
            {'retry_50x': False},
            gen_result_response(),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10200/stdout',
            {},
            {'output': SHARD_OUTPUT_2},
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10300/result',
            {'retry_50x': False},
            gen_result_response(),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10300/stdout',
            {},
            {'output': SHARD_OUTPUT_3},
          ),
        ])
    expected = [
      gen_yielded_data(0, output=SHARD_OUTPUT_1),
      gen_yielded_data(1, output=SHARD_OUTPUT_2),
      gen_yielded_data(2, output=SHARD_OUTPUT_3),
    ]
    actual = get_results(['10100', '10200', '10300'])
    self.assertEqual(expected, sorted(actual))

  def test_output_collector_called(self):
    # Three shards, one failed. All results are passed to output collector.
    self.expected_requests(
        [
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/result',
            {'retry_50x': False},
            gen_result_response(),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10100/stdout',
            {},
            {'output': SHARD_OUTPUT_1},
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10200/result',
            {'retry_50x': False},
            gen_result_response(),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10200/stdout',
            {},
            {'output': SHARD_OUTPUT_2},
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10300/result',
            {'retry_50x': False},
            gen_result_response(exit_code=1),
          ),
          (
            'https://host:9001/_ah/api/swarming/v1/task/10300/stdout',
            {},
            {'output': SHARD_OUTPUT_3},
          ),
        ])

    class FakeOutputCollector(object):
      def __init__(self):
        self.results = []
        self._lock = threading.Lock()

      def process_shard_result(self, index, result):
        with self._lock:
          self.results.append((index, result))

    output_collector = FakeOutputCollector()
    get_results(['10100', '10200', '10300'], output_collector)

    expected = [
      gen_yielded_data(0, output=SHARD_OUTPUT_1),
      gen_yielded_data(1, output=SHARD_OUTPUT_2),
      gen_yielded_data(2, output=SHARD_OUTPUT_3, exit_code=1),
    ]
    self.assertEqual(sorted(expected), sorted(output_collector.results))

  def test_collect_nothing(self):
    self.mock(swarming, 'yield_results', lambda *_: [])
    self.assertEqual(1, collect('https://localhost:1', ['10100', '10200']))
    self._check_output('', 'Results from some shards are missing: 0, 1\n')

  def test_collect_success(self):
    data = gen_result_response(output='Foo')
    self.mock(swarming, 'yield_results', lambda *_: [(0, data)])
    self.assertEqual(0, collect('https://localhost:1', ['10100']))
    expected = u'\n'.join((
      '+------------------------------------------------------+',
      '| Shard 0  https://localhost:1/user/task/10100         |',
      '+------------------------------------------------------+',
      'Foo',
      '+------------------------------------------------------+',
      '| End of shard 0                                       |',
      '|  Pending: 6.0s  Duration: 1.0s  Bot: swarm6  Exit: 0 |',
      '+------------------------------------------------------+',
      'Total duration: 1.0s',
      ''))
    self._check_output(expected, '')

  def test_collect_success_nostdout(self):
    data = gen_result_response(output='Foo')
    self.mock(swarming, 'yield_results', lambda *_: [(0, data)])
    self.assertEqual(0, collect('https://localhost:1', ['10100'], []))
    expected = u'\n'.join((
      '+------------------------------------------------------+',
      '| Shard 0  https://localhost:1/user/task/10100         |',
      '|  Pending: 6.0s  Duration: 1.0s  Bot: swarm6  Exit: 0 |',
      '+------------------------------------------------------+',
      'Total duration: 1.0s',
      ''))
    self._check_output(expected, '')

  def test_collect_fail(self):
    data = gen_result_response(output='Foo', exit_code=-9)
    data['output'] = 'Foo'
    self.mock(swarming, 'yield_results', lambda *_: [(0, data)])
    self.assertEqual(-9, collect('https://localhost:1', ['10100']))
    expected = u'\n'.join((
      '+-------------------------------------------------------+',
      '| Shard 0  https://localhost:1/user/task/10100          |',
      '+-------------------------------------------------------+',
      'Foo',
      '+-------------------------------------------------------+',
      '| End of shard 0                                        |',
      '|  Pending: 6.0s  Duration: 1.0s  Bot: swarm6  Exit: -9 |',
      '+-------------------------------------------------------+',
      'Total duration: 1.0s',
      ''))
    self._check_output(expected, '')

  def test_collect_one_missing(self):
    data = gen_result_response(output='Foo')
    data['output'] = 'Foo'
    self.mock(swarming, 'yield_results', lambda *_: [(0, data)])
    self.assertEqual(1, collect('https://localhost:1', ['10100', '10200']))
    expected = u'\n'.join((
      '+------------------------------------------------------+',
      '| Shard 0  https://localhost:1/user/task/10100         |',
      '+------------------------------------------------------+',
      'Foo',
      '+------------------------------------------------------+',
      '| End of shard 0                                       |',
      '|  Pending: 6.0s  Duration: 1.0s  Bot: swarm6  Exit: 0 |',
      '+------------------------------------------------------+',
      '',
      'Total duration: 1.0s',
      ''))
    self._check_output(expected, 'Results from some shards are missing: 1\n')

  def test_collect_multi(self):
    actual_calls = []
    def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
                       filepath_filter):
      self.assertIs(storage.__class__, isolateserver.Storage)
      self.assertIs(cache.__class__, local_caching.MemoryContentAddressedCache)
      # Ensure storage is pointing to required location.
      self.assertEqual('https://localhost:2', storage.server_ref.url)
      self.assertEqual('default', storage.server_ref.namespace)
      self.assertEqual(False, use_symlinks)
      self.assertEqual('.*', filepath_filter)
      actual_calls.append((isolated_hash, outdir))
    self.mock(isolateserver, 'fetch_isolated', fetch_isolated)

    collector = swarming.TaskOutputCollector(
        self.tempdir, ['json', 'console'], 2, '.*')
    for index in xrange(2):
      collector.process_shard_result(
          index,
          gen_result_response(
              outputs_ref={
                'isolated': str(index) * 40,
                'isolatedserver': 'https://localhost:2',
                'namespace': 'default',
              }))
    summary = collector.finalize()

    expected_calls = [
      ('0'*40, os.path.join(self.tempdir, '0')),
      ('1'*40, os.path.join(self.tempdir, '1')),
    ]
    self.assertEqual(expected_calls, actual_calls)

    # Ensure collected summary is correct.
    outputs_refs = [
      {
        'isolated': '0'*40,
        'isolatedserver': 'https://localhost:2',
        'namespace': 'default',
        'view_url':
            'https://localhost:2/browse?namespace=default&hash=' + '0'*40,
      },
      {
        'isolated': '1'*40,
        'isolatedserver': 'https://localhost:2',
        'namespace': 'default',
        'view_url':
            'https://localhost:2/browse?namespace=default&hash=' + '1'*40,
      },
    ]
    expected = {
      'shards': [gen_result_response(outputs_ref=o) for o in outputs_refs],
    }
    self.assertEqual(expected, summary)

    # Ensure summary dumped to a file is correct as well.
    with open(os.path.join(self.tempdir, 'summary.json'), 'r') as f:
      summary_dump = json.load(f)
    self.assertEqual(expected, summary_dump)

  def test_ensures_same_server(self):
    self.mock(logging, 'error', lambda *_: None)
    # Two shard results, attempt to use different servers.
    actual_calls = []
    self.mock(
        isolateserver, 'fetch_isolated',
        lambda *args: actual_calls.append(args))
    data = [
      gen_result_response(
        outputs_ref={
          'isolatedserver': 'https://server1',
          'namespace': 'namespace',
          'isolated':'hash1',
        }),
      gen_result_response(
        outputs_ref={
          'isolatedserver': 'https://server2',
          'namespace': 'namespace',
          'isolated':'hash1',
        }),
    ]

    # Feed them to collector.
    collector = swarming.TaskOutputCollector(
        self.tempdir, ['json', 'console'], 2, None)
    for index, result in enumerate(data):
      collector.process_shard_result(index, result)
    collector.finalize()

    # Only first fetch is made, second one is ignored.
    self.assertEqual(1, len(actual_calls))
    isolated_hash, storage, _, outdir, _, _ = actual_calls[0]
    self.assertEqual(
        ('hash1', os.path.join(self.tempdir, '0')),
        (isolated_hash, outdir))
    self.assertEqual('https://server1', storage.server_ref.url)


class TestMain(NetTestCase):
  # Tests calling main().
  def test_bot_delete(self):
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/bot/foo/delete',
            {'method': 'POST', 'data': {}},
            {},
          ),
        ])
    ret = self.main_safe(
        ['bot_delete', '--swarming', 'https://localhost:1', 'foo', '--force'])
    self._check_output('', '')
    self.assertEqual(0, ret)

  def test_trigger_raw_cmd(self):
    # Minimalist use.
    request = {
      'name': u'None/pool=default',
      'parent_task_id': '',
      'pool_task_template': 'AUTO',
      'priority': 200,
      'task_slices': [
        {
          'expiration_secs': 21600,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[{'key': 'pool', 'value': 'default'}],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
      ],
      'tags': [],
      'user': None,
    }
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--dimension', 'pool', 'default',
        '--raw-cmd',
        '--relative-cwd', 'deeep',
        '--',
        'python',
        '-c',
        'print(\'hi\')',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        'Triggered task: None/pool=default\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 12300\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')

  def test_trigger_raw_cmd_with_optional(self):
    request = {
      'name': u'None/caches=c1_foo=bar_foo1=bar1_pool=default',
      'parent_task_id': '',
      'pool_task_template': 'AUTO',
      'priority': 200,
      'task_slices': [
       {
          'expiration_secs': 60,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'caches', 'value': 'c1'},
                  {'key': 'caches', 'value': 'c2'},
                  {'key': 'foo', 'value': 'baz'},
                  {'key': 'foo1', 'value': 'baz1'},
                  {'key': 'opt', 'value': 'tional'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        {
          'expiration_secs': 120,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'caches', 'value': 'c1'},
                  {'key': 'caches', 'value': 'c2'},
                  {'key': 'foo', 'value': 'bar'},
                  {'key': 'foo1', 'value': 'baz1'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        {
          'expiration_secs': 21420,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'caches', 'value': 'c1'},
                  {'key': 'foo', 'value': 'bar'},
                  {'key': 'foo1', 'value': 'bar1'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        ],
      'tags': [],
      'user': None,
    }
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--dimension', 'pool', 'default',
        '--dimension', 'foo', 'bar',
        '--dimension', 'foo1', 'bar1',
        '--dimension', 'caches', 'c1',
        '--optional-dimension', 'foo', 'baz', 60,
        '--optional-dimension', 'opt', 'tional', 60,
        '--optional-dimension', 'foo1', 'baz1', 180,
        '--optional-dimension', 'caches', 'c2', 180,
        '--raw-cmd',
        '--relative-cwd', 'deeep',
        '--',
        'python',
        '-c',
        'print(\'hi\')',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        'Triggered task: None/caches=c1_foo=bar_foo1=bar1_pool=default\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 12300\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')

  def test_trigger_raw_cmd_with_optional_unsorted(self):
    request = {
      'name': u'None/foo1=bar1_os=Mac-10.12.6_pool=default',
      'parent_task_id': '',
      'pool_task_template': 'AUTO',
      'priority': 200,
      'task_slices': [
       {
          'expiration_secs': 60,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'foo1', 'value': 'baz1'},
                  {'key': 'os', 'value': 'Mac-10.13'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        {
          'expiration_secs': 60,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'foo1', 'value': 'baz1'},
                  {'key': 'os', 'value': 'Mac-10.12.6'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        {
          'expiration_secs': 21480,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'foo1', 'value': 'bar1'},
                  {'key': 'os', 'value': 'Mac-10.12.6'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        ],
      'tags': [],
      'user': None,
    }
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--dimension', 'os', 'Mac-10.12.6',
        '--dimension', 'pool', 'default',
        '--dimension', 'foo1', 'bar1',
        '--optional-dimension', 'foo1', 'baz1', 120,
        '--optional-dimension', 'os', 'Mac-10.13', 60,
        '--raw-cmd',
        '--relative-cwd', 'deeep',
        '--',
        'python',
        '-c',
        'print(\'hi\')',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        'Triggered task: None/foo1=bar1_os=Mac-10.12.6_pool=default\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 12300\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')

  def test_trigger_raw_cmd_with_optional_sameexp(self):
    request = {
      'name': u'None/foo=bar_foo1=bar1_pool=default',
      'parent_task_id': '',
      'pool_task_template': 'AUTO',
      'priority': 200,
      'task_slices': [
       {
          'expiration_secs': 60,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'foo', 'value': 'baz'},
                  {'key': 'foo1', 'value': 'bar1'},
                  {'key': 'foo2', 'value': 'baz2'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        {
          'expiration_secs': 21540, # 21600 - 60
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[
                  {'key': 'foo', 'value': 'bar'},
                  {'key': 'foo1', 'value': 'bar1'},
                  {'key': 'pool', 'value': 'default'},
              ],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
        ],
      'tags': [],
      'user': None,
    }
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--dimension', 'pool', 'default',
        '--dimension', 'foo', 'bar',
        '--dimension', 'foo1', 'bar1',
        '--optional-dimension', 'foo', 'baz', 60,
        '--optional-dimension', 'foo2', 'baz2', 60,
        '--raw-cmd',
        '--relative-cwd', 'deeep',
        '--',
        'python',
        '-c',
        'print(\'hi\')',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        'Triggered task: None/foo=bar_foo1=bar1_pool=default\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 12300\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')

  def test_trigger_raw_cmd_isolated(self):
    # Minimalist use.
    request = {
      'name': u'None/pool=default/' + FILE_HASH,
      'parent_task_id': '',
      'pool_task_template': 'AUTO',
      'priority': 200,
      'task_slices': [
        {
          'expiration_secs': 21600,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[{'key': 'pool', 'value': 'default'}],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref={
                'isolated': u'1111111111111111111111111111111111111111',
                'isolatedserver': 'https://localhost:2',
                'namespace': 'default-gzip',
              },
              io_timeout_secs=1200),
          'wait_for_capacity': False,
        },
      ],
      'tags': [],
      'user': None,
    }
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--dimension', 'pool', 'default',
        '--raw-cmd',
        '--isolate-server', 'https://localhost:2',
        '--isolated', FILE_HASH,
        '--',
        'python',
        '-c',
        'print(\'hi\')',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        u'Triggered task: None/pool=default/' + FILE_HASH + u'\n'
        u'To collect results, use:\n'
        u'  tools/swarming_client/swarming.py collect '
        u'-S https://localhost:1 12300\n'
        u'Or visit:\n'
        u'  https://localhost:1/user/task/12300\n',
        u'')

  def test_trigger_raw_cmd_with_service_account(self):
    # Minimalist use.
    request = {
      'name': u'None/pool=default',
      'parent_task_id': '',
      'pool_task_template': 'AUTO',
      'priority': 200,
      'task_slices': [
        {
          'expiration_secs': 21600,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[{'key': 'pool', 'value': 'default'}],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200),
          'wait_for_capacity': False,
        },
      ],
      'service_account': 'bot',
      'tags': [],
      'user': None,
    }
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--dimension', 'pool', 'default',
        '--service-account', 'bot',
        '--raw-cmd',
        '--',
        'python',
        '-c',
        'print(\'hi\')',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        'Triggered task: None/pool=default\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 12300\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')

  def test_trigger_isolated_hash(self):
    self.mock(swarming, 'now', lambda: 123456)

    request = gen_request_data(
        task_slices=[
          {
            'expiration_secs': 3600,
            'properties': gen_properties(
                inputs_ref={
                  'isolated': u'1111111111111111111111111111111111111111',
                  'isolatedserver': 'https://localhost:2',
                  'namespace': 'default-gzip',
                }),
            'wait_for_capacity': False,
          },
        ])
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--isolate-server', 'https://localhost:2',
        '--shards', '1',
        '--priority', '101',
        '--dimension', 'os', 'Mac',
        '--dimension', 'pool', 'default',
        '--expiration', '3600',
        '--user', 'joe@localhost',
        '--tags', 'tag:a',
        '--tags', 'tag:b',
        '--hard-timeout', '60',
        '--io-timeout', '60',
        '--task-name', 'unit_tests',
        '--isolated', FILE_HASH,
        '--',
        '--some-arg',
        '123',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        'Triggered task: unit_tests\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 12300\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')

  def test_trigger_isolated_and_json(self):
    write_json_calls = []
    self.mock(tools, 'write_json', lambda *args: write_json_calls.append(args))
    subprocess_calls = []
    self.mock(subprocess42, 'call', lambda *c: subprocess_calls.append(c))
    self.mock(swarming, 'now', lambda: 123456)

    isolated = os.path.join(self.tempdir, 'zaz.isolated')
    content = '{}'
    with open(isolated, 'wb') as f:
      f.write(content)

    isolated_hash = isolateserver_fake.hash_content(content)
    request = gen_request_data(
        task_slices=[
          {
            'expiration_secs': 3600,
            'properties': gen_properties(
                idempotent=True,
                inputs_ref={
                  'isolated': isolated_hash,
                  'isolatedserver': 'https://localhost:2',
                  'namespace': 'default-gzip',
                }),
            'wait_for_capacity': False,
          },
        ])
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--isolate-server', 'https://localhost:2',
        '--shards', '1',
        '--priority', '101',
        '--dimension', 'os', 'Mac',
        '--dimension', 'pool', 'default',
        '--expiration', '3600',
        '--user', 'joe@localhost',
        '--tags', 'tag:a',
        '--tags', 'tag:b',
        '--hard-timeout', '60',
        '--io-timeout', '60',
        '--idempotent',
        '--task-name', 'unit_tests',
        '--dump-json', 'foo.json',
        '--isolated', isolated_hash,
        '--',
        '--some-arg',
        '123',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self.assertEqual([], subprocess_calls)
    self._check_output(
        'Triggered task: unit_tests\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 --json foo.json\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')
    expected = [
      (
        u'foo.json',
        {
          'base_task_name': 'unit_tests',
          'tasks': {
            'unit_tests': {
              'shard_index': 0,
              'task_id': '12300',
              'view_url': 'https://localhost:1/user/task/12300',
            }
          },
          'request': {
            'name': 'unit_tests',
            'parent_task_id': '',
            'pool_task_template': 'AUTO',
            'priority': 101,
            'task_slices': [
              {
                'expiration_secs': 3600,
                'properties': gen_properties(
                    idempotent=True,
                    inputs_ref={
                      'isolated': isolated_hash,
                      'isolatedserver': 'https://localhost:2',
                      'namespace': 'default-gzip',
                    }),
                'wait_for_capacity': False,
              },
            ],
            'tags': ['tag:a', 'tag:b'],
            'user': 'joe@localhost',
          },
        },
        True,
      ),
    ]
    self.assertEqual(expected, write_json_calls)

  def test_trigger_cipd(self):
    self.mock(swarming, 'now', lambda: 123456)

    request = gen_request_data(
        task_slices=[
          {
            'expiration_secs': 3600,
            'properties': gen_properties(
                cipd_input={
                  'client_package': None,
                  'packages': [
                    {
                      'package_name': 'super/awesome/pkg',
                      'path': 'path/to/pkg',
                      'version': 'version:42',
                    },
                  ],
                  'server': None,
                },
                inputs_ref={
                  'isolated': u'1111111111111111111111111111111111111111',
                  'isolatedserver': 'https://localhost:2',
                  'namespace': 'default-gzip',
                }),
            'wait_for_capacity': False,
          },
        ])
    result = gen_request_response(request)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'trigger',
        '--swarming', 'https://localhost:1',
        '--isolate-server', 'https://localhost:2',
        '--shards', '1',
        '--priority', '101',
        '--dimension', 'os', 'Mac',
        '--dimension', 'pool', 'default',
        '--expiration', '3600',
        '--user', 'joe@localhost',
        '--tags', 'tag:a',
        '--tags', 'tag:b',
        '--hard-timeout', '60',
        '--io-timeout', '60',
        '--task-name', 'unit_tests',
        '--isolated', FILE_HASH,
        '--cipd-package', 'path/to/pkg:super/awesome/pkg:version:42',
        '--',
        '--some-arg',
        '123',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (actual, sys.stderr.getvalue()))
    self._check_output(
        'Triggered task: unit_tests\n'
        'To collect results, use:\n'
        '  tools/swarming_client/swarming.py collect '
        '-S https://localhost:1 12300\n'
        'Or visit:\n'
        '  https://localhost:1/user/task/12300\n',
        '')

  def test_trigger_no_request(self):
    with self.assertRaises(SystemExit):
      main([
            'trigger', '--swarming', 'https://host',
            '--isolate-server', 'https://host', '-T', 'foo',
            '-d', 'pool', 'default',
          ])
    self._check_output(
        '',
        'Usage: swarming.py trigger [options] (hash|isolated) '
          '[-- extra_args|raw command]\n'
        '\n'
        'swarming.py: error: Specify at least one of --raw-cmd or --isolated '
        'or both\n')

  def test_trigger_no_env_vars(self):
    with self.assertRaises(SystemExit):
      main(['trigger'])
    self._check_output(
        '',
        'Usage: swarming.py trigger [options] (hash|isolated) '
          '[-- extra_args|raw command]'
        '\n\n'
        'swarming.py: error: --swarming is required.'
        '\n')

  def test_trigger_no_swarming_env_var(self):
    with self.assertRaises(SystemExit):
      with test_env.EnvVars({'ISOLATE_SERVER': 'https://host'}):
        main(['trigger', '-T' 'foo', 'foo.isolated'])
    self._check_output(
        '',
        'Usage: swarming.py trigger [options] (hash|isolated) '
          '[-- extra_args|raw command]'
        '\n\n'
        'swarming.py: error: --swarming is required.'
        '\n')

  def test_trigger_no_isolate_server(self):
    with self.assertRaises(SystemExit):
      with test_env.EnvVars({'SWARMING_SERVER': 'https://host'}):
        main(['trigger', 'foo.isolated', '-d', 'pool', 'default'])
    self._check_output(
        '',
        'Usage: swarming.py trigger [options] (hash|isolated) '
          '[-- extra_args|raw command]'
        '\n\n'
        'swarming.py: error: Specify at least one of --raw-cmd or --isolated '
          'or both\n')

  def test_trigger_no_dimension(self):
    with self.assertRaises(SystemExit):
      main([
            'trigger', '--swarming', 'https://host', '--raw-cmd', '--', 'foo',
          ])
    self._check_output(
        '',
        'Usage: swarming.py trigger [options] (hash|isolated) '
          '[-- extra_args|raw command]'
        '\n\n'
        'swarming.py: error: Please at least specify one --dimension\n')

  def test_collect_default_json(self):
    j = os.path.join(self.tempdir, 'foo.json')
    data = {
      'base_task_name': 'unit_tests',
      'tasks': {
        'unit_tests': {
          'shard_index': 0,
          'task_id': '12300',
          'view_url': 'https://localhost:1/user/task/12300',
        }
      },
      'request': {
        'name': 'unit_tests',
        'parent_task_id': '',
        'priority': 101,
        'task_slices': [
          {
            'expiration_secs': 3600,
            'properties': gen_properties(
                command=['python', '-c', 'print(\'hi\')'],
                relative_cwd='deeep'),
            'wait_for_capacity': False,
          },
        ],
        'tags': ['tag:a', 'tag:b'],
        'user': 'joe@localhost',
      },
    }
    with open(j, 'wb') as f:
      json.dump(data, f)
    def stub_collect(
        swarming_server, task_ids, timeout, decorate, print_status_updates,
        task_summary_json, task_output_dir, task_output_stdout, include_perf,
        filepath_filter):
      self.assertEqual('https://host', swarming_server)
      self.assertEqual([u'12300'], task_ids)
      # It is automatically calculated from hard timeout + expiration + 10.
      self.assertEqual(3670., timeout)
      self.assertEqual(True, decorate)
      self.assertEqual(True, print_status_updates)
      self.assertEqual('/a', task_summary_json)
      self.assertEqual('/b', task_output_dir)
      self.assertSetEqual(set(['console', 'json']), set(task_output_stdout))
      self.assertEqual(False, include_perf)
      self.assertEqual('output.json', filepath_filter)
      print('Fake output')
    self.mock(swarming, 'collect', stub_collect)
    self.main_safe(
        ['collect', '--swarming', 'https://host', '--json', j, '--decorate',
         '--print-status-updates', '--task-summary-json', '/a',
         '--task-output-dir', '/b', '--task-output-stdout', 'all',
         '--filepath-filter', 'output.json'])
    self._check_output('Fake output\n', '')

  def test_post(self):
    out = StringIO.StringIO()
    err = StringIO.StringIO()
    self.mock(sys, 'stdin', StringIO.StringIO('{"a":"b"}'))
    self.mock(sys, 'stdout', out)
    self.mock(sys, 'stderr', err)
    self.expected_requests(
        [
          (
            'http://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': '{"a":"b"}', 'method': 'POST'},
            '{"yo":"dawg"}',
            {},
          ),
        ])
    ret = self.main_safe(['post', '-S', 'http://localhost:1', 'tasks/new'])
    self.assertEqual(0, ret)
    self.assertEqual('{"yo":"dawg"}', out.getvalue())
    self.assertEqual('', err.getvalue())

  def test_post_fail(self):
    out = StringIO.StringIO()
    err = StringIO.StringIO()
    self.mock(sys, 'stdin', StringIO.StringIO('{"a":"b"}'))
    self.mock(sys, 'stdout', out)
    self.mock(sys, 'stderr', err)
    ret = self.main_safe(['post', '-S', 'http://localhost:1', 'tasks/new'])
    self.assertEqual(1, ret)
    self.assertEqual('', out.getvalue())
    self.assertEqual('No response!\n', err.getvalue())

  def test_query_base(self):
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/bot/botid/tasks?limit=200',
            {},
            {'yo': 'dawg'},
          ),
        ])
    ret = self.main_safe(
        [
          'query', '--swarming', 'https://localhost:1', 'bot/botid/tasks',
        ])
    self._check_output('{\n  "yo": "dawg"\n}\n', '')
    self.assertEqual(0, ret)

  def test_query_cursor(self):
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/bot/botid/tasks?'
                'foo=bar&limit=2',
            {},
            {
              'cursor': '%',
              'extra': False,
              'items': ['A'],
            },
          ),
          (
            'https://localhost:1/_ah/api/swarming/v1/bot/botid/tasks?'
                'foo=bar&cursor=%25&limit=1',
            {},
            {
              'cursor': None,
              'items': ['B'],
              'ignored': True,
            },
          ),
        ])
    ret = self.main_safe(
        [
          'query', '--swarming', 'https://localhost:1',
          'bot/botid/tasks?foo=bar',
          '--limit', '2',
        ])
    expected = (
        '{\n'
        '  "extra": false, \n'
        '  "items": [\n'
        '    "A", \n'
        '    "B"\n'
        '  ]\n'
        '}\n')
    self._check_output(expected, '')
    self.assertEqual(0, ret)

  def test_reproduce(self):
    old_cwd = os.getcwd()
    try:
      os.chdir(self.tempdir)

      def call(cmd, env, cwd):
        w = os.path.abspath('work')
        self.assertEqual([os.path.join(w, 'foo'), '--bar'], cmd)
        expected = os.environ.copy()
        expected['aa'] = 'bb'
        expected['PATH'] = os.pathsep.join(
            (os.path.join(w, 'foo', 'bar'), os.path.join(w, 'second'),
              expected['PATH']))
        expected['SWARMING_TASK_ID'] = 'reproduce'
        expected['SWARMING_BOT_ID'] = 'reproduce'
        self.assertEqual(expected, env)
        self.assertEqual(unicode(w), cwd)
        return 0

      self.mock(subprocess42, 'call', call)

      self.expected_requests(
          [
            (
              'https://localhost:1/_ah/api/swarming/v1/task/123/request',
              {},
              {
                'properties': {
                  'command': ['foo'],
                  'env': [
                    {'key': 'aa', 'value': 'bb'},
                  ],
                  'env_prefixes': [
                    {'key': 'PATH', 'value': ['foo/bar', 'second']},
                  ],
                  'secret_bytes': None,
                },
              },
            ),
          ])
      ret = self.main_safe(
          [
            'reproduce', '--swarming', 'https://localhost:1', '123', '--',
            '--bar',
          ])
      self._check_output('', '')
      self.assertEqual(0, ret)
    finally:
      os.chdir(old_cwd)

  def test_run(self):
    request = {
      'name': u'None/pool=default',
      'parent_task_id': '',
      'priority': 200,
      'pool_task_template': 'AUTO',
      'task_slices': [
        {
          'expiration_secs': 21600,
          'properties': gen_properties(
              command=['python', '-c', 'print(\'hi\')'],
              dimensions=[{'key': 'pool', 'value': 'default'}],
              execution_timeout_secs=3600,
              extra_args=None,
              inputs_ref=None,
              io_timeout_secs=1200,
              relative_cwd='deeep'),
          'wait_for_capacity': False,
        },
      ],
      'tags': [],
      'user': None,
    }
    result = gen_request_response(request)

    def stub_collect(
        swarming_server, task_ids, timeout, decorate, print_status_updates,
        task_summary_json, task_output_dir, task_output_stdout, include_perf,
        filepath_filter):
      self.assertEqual('https://localhost:1', swarming_server)
      self.assertEqual([u'12300'], task_ids)
      # It is automatically calculated from hard timeout + expiration + 10.
      self.assertEqual(25210., timeout)
      self.assertEqual(None, decorate)
      self.assertEqual(None, print_status_updates)
      self.assertEqual(None, task_summary_json)
      self.assertEqual(None, task_output_dir)
      self.assertSetEqual(set(['console', 'json']), set(task_output_stdout))
      self.assertEqual(False, include_perf)
      self.assertEqual(None, filepath_filter)
      print('Fake output')
      return 0
    self.mock(swarming, 'collect', stub_collect)
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/tasks/new',
            {'data': request},
            result,
          ),
        ])
    ret = self.main_safe([
        'run',
        '--swarming', 'https://localhost:1',
        '--dimension', 'pool', 'default',
        '--raw-cmd',
        '--relative-cwd', 'deeep',
        '--',
        'python',
        '-c',
        'print(\'hi\')',
      ])
    # pylint: disable=no-member
    actual = sys.stdout.getvalue()
    self.assertEqual(0, ret, (ret, actual, sys.stderr.getvalue()))
    self._check_output(
        u'Triggered task: None/pool=default\nFake output\n', '')

  def test_cancel(self):
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/task/10100/cancel',
            {'data': {'kill_running': False}, 'method': 'POST'},
            {'yo': 'dawg'},
          ),
        ])
    ret = self.main_safe(
        [
          'cancel', '--swarming', 'https://localhost:1', '10100',
        ])
    self._check_output('', '')
    self.assertEqual(0, ret)

  def test_collect_timeout_zero(self):
    j = os.path.join(self.tempdir, 'foo.json')
    pending = gen_result_response(state='PENDING')
    self.expected_requests(
        [
          (
            'https://localhost:1/_ah/api/swarming/v1/task/10100/result',
            {'retry_50x': True},
            pending,
          ),
        ])
    self.main_safe(
        [
          'collect', '--swarming', 'https://localhost:1',
          '--task-summary-json', j, '--timeout', '-1', '10100',
        ])
    self._check_output('swarm6: 10100 0\n', '')
    with open(j, 'r') as f:
      actual = json.load(f)
    self.assertEqual({u'shards': [pending]}, actual)


class TestCommandBot(NetTestCase):
  # Specialized test fixture for command 'bot'.
  def setUp(self):
    super(TestCommandBot, self).setUp()
    # Sample data retrieved from actual server.
    self.now = unicode(datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
    self.bot_1 = {
      u'bot_id': u'swarm1',
      u'created_ts': self.now,
      u'dimensions': [
        {u'key': u'cores', u'value': [u'8']},
        {u'key': u'cpu', u'value': [u'x86', u'x86-64']},
        {u'key': u'gpu', u'value': []},
        {u'key': u'id', u'value': [u'swarm1']},
        {u'key': u'os', u'value': [u'Ubuntu', u'Ubuntu-12.04']},
      ],
      u'external_ip': u'1.1.1.1',
      u'hostname': u'swarm1.example.com',
      u'internal_ip': u'192.168.0.1',
      u'is_dead': True,
      u'last_seen_ts': 'A long time ago',
      u'quarantined': False,
      u'task_id': u'',
      u'task_name': None,
      u'version': u'56918a2ea28a6f51751ad14cc086f118b8727905',
    }
    self.bot_2 = {
      u'bot_id': u'swarm2',
      u'created_ts': self.now,
      u'dimensions': [
        {u'key': u'cores', u'value': [u'8']},
        {u'key': u'cpu', u'value': [u'x86', u'x86-64']},
        {u'key': u'gpu', u'value': [
          u'15ad',
          u'15ad:0405',
          u'VMware Virtual SVGA 3D Graphics Adapter',
        ]},
        {u'key': u'id', u'value': [u'swarm2']},
        {u'key': u'os', u'value': [u'Windows', u'Windows-6.1']},
      ],
      u'external_ip': u'1.1.1.2',
      u'hostname': u'swarm2.example.com',
      u'internal_ip': u'192.168.0.2',
      u'is_dead': False,
      u'last_seen_ts': self.now,
      u'quarantined': False,
      u'task_id': u'',
      u'task_name': None,
      u'version': u'56918a2ea28a6f51751ad14cc086f118b8727905',
    }
    self.bot_3 = {
      u'bot_id': u'swarm3',
      u'created_ts': self.now,
      u'dimensions': [
        {u'key': u'cores', u'value': [u'4']},
        {u'key': u'cpu', u'value': [u'x86', u'x86-64']},
        {u'key': u'gpu', u'value': [u'15ad', u'15ad:0405']},
        {u'key': u'id', u'value': [u'swarm3']},
        {u'key': u'os', u'value': [u'Mac', u'Mac-10.9']},
      ],
      u'external_ip': u'1.1.1.3',
      u'hostname': u'swarm3.example.com',
      u'internal_ip': u'192.168.0.3',
      u'is_dead': False,
      u'last_seen_ts': self.now,
      u'quarantined': False,
      u'task_id': u'148569b73a89501',
      u'task_name': u'browser_tests',
      u'version': u'56918a2ea28a6f51751ad14cc086f118b8727905',
    }
    self.bot_4 = {
      u'bot_id': u'swarm4',
      u'created_ts': self.now,
      u'dimensions': [
        {u'key': u'cores', u'value': [u'8']},
        {u'key': u'cpu', u'value': [u'x86', u'x86-64']},
        {u'key': u'gpu', u'value': []},
        {u'key': u'id', u'value': [u'swarm4']},
        {u'key': u'os', u'value': [u'Ubuntu', u'Ubuntu-12.04']},
      ],
      u'external_ip': u'1.1.1.4',
      u'hostname': u'swarm4.example.com',
      u'internal_ip': u'192.168.0.4',
      u'is_dead': False,
      u'last_seen_ts': self.now,
      u'quarantined': False,
      u'task_id': u'14856971a64c601',
      u'task_name': u'base_unittests',
      u'version': u'56918a2ea28a6f51751ad14cc086f118b8727905',
    }

  def mock_swarming_api(self, bots, cursor):
    """Returns fake /_ah/api/swarming/v1/bots/list data."""
    # Sample data retrieved from actual server.
    return {
      u'items': bots,
      u'cursor': cursor,
      u'death_timeout': 1800.0,
      u'limit': 4,
      u'now': unicode(self.now),
    }

  def test_bots(self):
    base_url = 'https://localhost:1/_ah/api/swarming/v1/bots/list?'
    self.expected_requests(
        [
          (
            base_url + 'is_dead=FALSE&is_busy=NONE&is_mp=NONE',
            {},
            self.mock_swarming_api([self.bot_2], 'opaque'),
          ),
          (
            base_url + 'is_dead=FALSE&is_busy=NONE&is_mp=NONE&cursor=opaque',
            {},
            self.mock_swarming_api([self.bot_3], 'opaque2'),
          ),
          (
            base_url + 'is_dead=FALSE&is_busy=NONE&is_mp=NONE&cursor=opaque2',
            {},
            self.mock_swarming_api([self.bot_4], None),
          ),
        ])
    ret = self.main_safe(['bots', '--swarming', 'https://localhost:1'])
    expected = (
        u'swarm2\n'
        u'  {"cores": ["8"], "cpu": ["x86", "x86-64"], "gpu": '
          '["15ad", "15ad:0405", "VMware Virtual SVGA 3D Graphics Adapter"], '
          '"id": ["swarm2"], "os": ["Windows", "Windows-6.1"]}\n'
        'swarm3\n'
        '  {"cores": ["4"], "cpu": ["x86", "x86-64"], "gpu": ["15ad", '
          '"15ad:0405"], "id": ["swarm3"], "os": ["Mac", "Mac-10.9"]}\n'
        u'  task: 148569b73a89501\n'
        u'swarm4\n'
        u'  {"cores": ["8"], "cpu": ["x86", "x86-64"], "gpu": [], '
          '"id": ["swarm4"], "os": ["Ubuntu", "Ubuntu-12.04"]}\n'
        u'  task: 14856971a64c601\n')
    self._check_output(expected, '')
    self.assertEqual(0, ret)

  def test_bots_bare(self):
    base_url = 'https://localhost:1/_ah/api/swarming/v1/bots/list?'
    self.expected_requests(
        [
          (
            base_url + 'is_dead=FALSE&is_busy=NONE&is_mp=NONE',
            {},
            self.mock_swarming_api([self.bot_2], 'opaque'),
          ),
          (
            base_url + 'is_dead=FALSE&is_busy=NONE&is_mp=NONE&cursor=opaque',
            {},
            self.mock_swarming_api([self.bot_3], 'opaque2'),
          ),
          (
            base_url + 'is_dead=FALSE&is_busy=NONE&is_mp=NONE&cursor=opaque2',
            {},
            self.mock_swarming_api([self.bot_4], None),
          ),
        ])
    ret = self.main_safe(
        ['bots', '--swarming', 'https://localhost:1', '--bare'])
    self._check_output("swarm2\nswarm3\nswarm4\n", '')
    self.assertEqual(0, ret)

  def test_bots_filter(self):
    base_url = 'https://localhost:1/_ah/api/swarming/v1/bots/list?'
    self.expected_requests(
        [
          (
            base_url +
              'is_dead=FALSE&is_busy=TRUE&is_mp=NONE&dimensions=os%3AWindows',
            {},
            self.mock_swarming_api([self.bot_2], None),
          ),
        ])
    ret = self.main_safe(
        [
          'bots', '--swarming', 'https://localhost:1',
          '--busy',
          '--dimension', 'os', 'Windows',
        ])
    expected = (
        u'swarm2\n  {"cores": ["8"], "cpu": ["x86", "x86-64"], '
          '"gpu": ["15ad", "15ad:0405", "VMware Virtual SVGA 3D Graphics '
          'Adapter"], "id": ["swarm2"], '
          '"os": ["Windows", "Windows-6.1"]}\n')
    self._check_output(expected, '')
    self.assertEqual(0, ret)

  def test_bots_filter_keep_dead(self):
    base_url = 'https://localhost:1/_ah/api/swarming/v1/bots/list?'
    self.expected_requests(
        [
          (
            base_url + 'is_dead=NONE&is_busy=NONE&is_mp=NONE',
            {},
            self.mock_swarming_api([self.bot_1, self.bot_4], None),
          ),
        ])
    ret = self.main_safe(
        [
          'bots', '--swarming', 'https://localhost:1',
          '--keep-dead',
        ])
    expected = (
        u'swarm1\n  {"cores": ["8"], "cpu": ["x86", "x86-64"], "gpu": [], '
          '"id": ["swarm1"], "os": ["Ubuntu", "Ubuntu-12.04"]}\n'
        u'swarm4\n'
        u'  {"cores": ["8"], "cpu": ["x86", "x86-64"], "gpu": [], '
          '"id": ["swarm4"], "os": ["Ubuntu", "Ubuntu-12.04"]}\n'
        u'  task: 14856971a64c601\n')
    self._check_output(expected, '')
    self.assertEqual(0, ret)

  def test_bots_filter_dead_only(self):
    base_url = 'https://localhost:1/_ah/api/swarming/v1/bots/list?'
    self.expected_requests(
        [
          (
            base_url +
              'is_dead=TRUE&is_busy=NONE&is_mp=NONE&dimensions=os%3AUbuntu',
            {},
            self.mock_swarming_api([self.bot_1], None),
          ),
        ])
    ret = self.main_safe(
        [
          'bots', '--swarming', 'https://localhost:1',
          '--dimension', 'os', 'Ubuntu', '--dead-only',
        ])
    expected = (
        u'swarm1\n  {"cores": ["8"], "cpu": ["x86", "x86-64"], "gpu": [], '
          '"id": ["swarm1"], "os": ["Ubuntu", "Ubuntu-12.04"]}\n')
    self._check_output(expected, '')
    self.assertEqual(0, ret)


if __name__ == '__main__':
  for env_var_to_remove in (
      'ISOLATE_SERVER', 'SWARMING_TASK_ID', 'SWARMING_SERVER'):
    os.environ.pop(env_var_to_remove, None)
  test_env.main()
