#!/usr/bin/env vpython
# coding: utf-8
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.

import base64
import datetime
import logging
import os
import random
import StringIO
import sys
import unittest
import zipfile

import mock
from parameterized import parameterized

# Setups environment.
import test_env_handlers

from google.appengine.api import datastore_errors
from google.appengine.ext import ndb

import webapp2
import webtest

import handlers_bot
from components import auth
from components import ereporter2
from components import utils
from components.test_support import test_case
from proto.config import bots_pb2
from proto.config import pools_pb2
from proto.plugin import plugin_pb2
from server import bot_archive
from server import bot_auth
from server import bot_code
from server import bot_groups_config
from server import bot_management
from server import external_scheduler
from server import pools_config
from server import rbe
from server import realms
from server import service_accounts
from server import task_pack
from server import task_queues
from server import task_scheduler


def fmtdate(d):
  """Formats a datetime.datetime instance to the format generated by the API."""
  return unicode(d.strftime('%Y-%m-%dT%H:%M:%S'))


class BotApiTest(test_env_handlers.AppTestBase):
  # crbug.com/10569967
  no_run = 1

  def setUp(self):
    super(BotApiTest, self).setUp()
    # By default requests in tests are coming from bot with fake IP.
    routes = handlers_bot.get_routes()
    app = webapp2.WSGIApplication(routes, debug=True)
    self.app = webtest.TestApp(
        app,
        extra_environ={
            'REMOTE_ADDR': self.source_ip,
            'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
        })
    self.mock(ereporter2, 'log_request', lambda *args, **kwargs: self.fail(
        '%s, %s' % (args, kwargs)))
    # Bot API test cases run by default as bot.
    self.set_as_bot()
    self.now = datetime.datetime(2010, 1, 2, 3, 4, 5)
    self.mock_now(self.now)
    self.mock_default_pool_acl([])
    self.mock_tq_tasks()

  def tearDown(self):
    super(BotApiTest, self).tearDown()
    mock.patch.stopall()

  def mock_bot_group_config(self, **kwargs):
    kwargs = kwargs.copy()
    for f in bot_groups_config.BotGroupConfig._fields:
      if f not in kwargs:
        kwargs[f] = None
    group_cfg = bot_groups_config.BotGroupConfig(**kwargs)
    auth_cfg = bot_groups_config.BotAuth(
        log_if_failed=False,
        require_luci_machine_token=True,
        require_service_account=None,
        require_gce_vm_token=None,
        ip_whitelist=None,
    )
    self.mock(bot_auth,
              'authenticate_bot', lambda *args, **kwargs: (group_cfg, auth_cfg))
    return group_cfg, auth_cfg

  def mock_pool_config(self, pool, **kwargs):
    def mocked_get_pool_config(name):
      if name == pool:
        return pools_config.init_pool_config(
            name=name,
            rev='rev',
            scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_UNKNOWN,
            **kwargs)
      return None

    self.mock(pools_config, 'get_pool_config', mocked_get_pool_config)

  def test_handshake(self):
    errors = []

    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)
    params = {
        'dimensions': {
            'id': ['id1'],
            'pool': ['default'],
        },
        'state': {
            u'running_time': 0,
            u'sleep_streak': 0
        },
        'version': '1',
    }
    response = self.app.post_json(
        '/swarming/api/v1/bot/handshake', params=params).json
    self.assertEqual([
        u'bot_config_name',
        u'bot_config_rev',
        u'bot_group_cfg',
        u'bot_group_cfg_version',
        u'bot_version',
        u'server_version',
    ], sorted(response))
    self.assertEqual({u'dimensions': {}}, response['bot_group_cfg'])
    self.assertEqual('default', response['bot_group_cfg_version'])
    self.assertEqual(64, len(response['bot_version']))
    self.assertEqual(u'v1a', response['server_version'])
    self.assertEqual([], errors)

  def test_handshake_minimum(self):
    errors = []

    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)
    params = {'dimensions': {'id': ['id1']}}
    response = self.app.post_json('/swarming/api/v1/bot/handshake', params).json
    self.assertEqual([
        u'bot_config_name',
        u'bot_config_rev',
        u'bot_group_cfg',
        u'bot_group_cfg_version',
        u'bot_version',
        u'server_version',
    ], sorted(response))
    self.assertEqual(64, len(response['bot_version']))
    self.assertEqual(u'v1a', response['server_version'])
    expected = [
        'Quarantined Bot\n'
        'https://test-swarming.appspot.com/restricted/bot/id1\n'
        'Unexpected keys missing: [u\'state\', u\'version\']; '
        'did you make a typo?',
    ]
    self.assertEqual(expected, errors)

  def test_handshake_duplicate_dimension_value(self):
    # Test a specific failure mode where '<key>:<value>' is duplicate.
    # This throws a BadValueError while trying to save a
    # BotEvent.dimensions_flat, even if the bot is forcibly quarantined.
    errors = []

    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)
    params = {
        'dimensions': {
            'id': ['id1'],
            'pool': ['default', 'default'],
        },
        'state': {
            u'running_time': 0,
            u'sleep_streak': 0
        },
        'version': '1',
    }
    response = self.app.post_json(
        '/swarming/api/v1/bot/handshake', params=params).json
    self.assertEqual([
        u'bot_config_name',
        u'bot_config_rev',
        u'bot_group_cfg',
        u'bot_group_cfg_version',
        u'bot_version',
        u'server_version',
    ], sorted(response))
    self.assertEqual({u'dimensions': {}}, response['bot_group_cfg'])
    self.assertEqual('default', response['bot_group_cfg_version'])
    self.assertEqual(64, len(response['bot_version']))
    self.assertEqual(u'v1a', response['server_version'])
    msg = (u'Quarantined Bot\n'
           'https://test-swarming.appspot.com/restricted/bot/id1\n'
           'Dimension values include duplication. key: pool, values: '
           '[u\'default\', u\'default\']')
    self.assertEqual([msg], errors)

  def test_handshake_long_dimension(self):
    # Test a specific failure mode where '<key>:<value>' is over 1500 bytes.
    # This throws a BadValueError while trying to save a
    # BotEvent.dimensions_flat, even if the bot is forcibly quarantined.
    errors = []

    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)
    params = {
        'dimensions': {
            'id': ['id1'],
            'pool': ['default'],
            'a': ['b' * 1499],
        },
        'state': {
            u'running_time': 0,
            u'sleep_streak': 0
        },
        'version': '1',
    }
    response = self.app.post_json(
        '/swarming/api/v1/bot/handshake', params=params).json
    self.assertEqual([
        u'bot_config_name',
        u'bot_config_rev',
        u'bot_group_cfg',
        u'bot_group_cfg_version',
        u'bot_version',
        u'server_version',
    ], sorted(response))
    self.assertEqual({u'dimensions': {}}, response['bot_group_cfg'])
    self.assertEqual('default', response['bot_group_cfg_version'])
    self.assertEqual(64, len(response['bot_version']))
    self.assertEqual(u'v1a', response['server_version'])
    msg = (u'Quarantined Bot\n'
           'https://test-swarming.appspot.com/restricted/bot/id1\n'
           'Invalid dimension value. key: a, value: %s' % ('b' * 1499))
    self.assertEqual([msg], errors)

  def test_handshake_extra(self):
    errors = []

    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)
    params = {
        # Works with unknown items but logs an error. This permits catching
        # typos.
        'foo': 1,
        'dimensions': {
            'id': ['bot1'],
            'pool': ['default'],
        },
        'state': {
            'bar': 2,
            'ip': '127.0.0.1',
            'running_time': 1234.0,
            'sleep_streak': 0,
        },
        'version': '123',
    }
    response = self.app.post_json(
        '/swarming/api/v1/bot/handshake', params=params).json
    self.assertEqual([
        u'bot_config_name',
        u'bot_config_rev',
        u'bot_group_cfg',
        u'bot_group_cfg_version',
        u'bot_version',
        u'server_version',
    ], sorted(response))
    self.assertEqual(64, len(response['bot_version']))
    self.assertEqual(u'v1a', response['server_version'])
    expected = [
        u'Quarantined Bot\n'
        u'https://test-swarming.appspot.com/restricted/bot/bot1\n'
        u'Unexpected keys superfluous: [u\'foo\']; did you make a typo?',
    ]
    self.assertEqual(expected, errors)

  def test_poll_maintenance(self):
    params = self.do_handshake()
    params['state']['maintenance'] = 'very busy'
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': True,
    }
    self.assertEqual(expected, response)

    # with non string maintenance message.
    params['state']['maintenance'] = True
    self.post_json('/swarming/api/v1/bot/poll', params, status=400)

  def test_poll_bad_bot(self):
    # If bot is not sending required keys but report right version, enforce
    # sleeping.
    errors = []

    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)
    params = self.do_handshake()
    params.pop('state')
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'sleep',
        u'quarantined': True,
    }
    self.assertTrue(response.pop(u'duration'))
    self.assertEqual(expected, response)
    expected = [
        'Quarantined Bot\n'
        'https://test-swarming.appspot.com/restricted/bot/bot1\n'
        'Unexpected keys missing: [u\'state\']; '
        'did you make a typo?',
    ]
    self.assertEqual(expected, errors)

  def test_poll_no_bot_id(self):
    params = self.do_handshake()
    params['dimensions'].pop('id')
    response = self.post_json('/swarming/api/v1/bot/poll', params, status=403)
    expected = {
        u'text': u'Bot ID is not specified',
    }
    self.assertEqual(expected, response)

  def test_poll_empty_dimension_value(self):
    params = self.do_handshake()
    params['dimensions']['empty_key'] = []
    errors = []

    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response['quarantined'])
    self.assertEqual([
        'Quarantined Bot\n'
        'https://test-swarming.appspot.com/restricted/bot/bot1\n'
        'Dimension values should not be empty. key: empty_key'
    ], errors)

  def test_poll_bad_version(self):
    params = self.do_handshake()
    latest_version = params['version']
    params['version'] = 'badversion'
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'update',
        u'version': latest_version,
    }
    self.assertEqual(expected, response)

  def test_poll_bad_dimensions(self):
    ticker = test_case.Ticker(self.now)
    errors = []
    def add_error(request, source, message):
      self.assertTrue(request)
      self.assertEqual('bot', source)
      errors.append(message)

    self.mock(ereporter2, 'log_request', add_error)

    self.mock_now(ticker())
    params = self.do_handshake()
    params['dimensions']['foo'] = [['bar']]  # list is invalid.
    t1 = self.mock_now(ticker())
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': True,
    }
    self.assertEqual(expected, response)

    # Quarantine message should be explicit.
    msg = (u'Quarantined Bot\n'
           'https://test-swarming.appspot.com/restricted/bot/bot1\n'
           'Invalid dimension value. key: foo, value: [u\'bar\']')
    self.assertEqual([msg], errors)

    # Quarantine event should be registered, too.
    expected_event = {
        'authenticated_as': u'bot:whitelisted-ip',
        # last valid dimensions should be used in BotEvent.
        'dimensions': {
            u'id': [u'bot1'],
            u'pool': [u'default']
        },
        'event_type': u'request_sleep',
        'external_ip': u'192.168.2.2',
        'idle_since_ts': None,
        'last_seen_ts': t1,
        'lease_expiration_ts': None,
        'lease_id': None,
        'leased_indefinitely': None,
        'machine_lease': None,
        'machine_type': None,
        'maintenance_msg': None,
        'message': u"Invalid dimension value. key: foo, value: [u'bar']",
        'quarantined': True,
        'state': {
            u'bot_group_cfg_version': u'default',
            u'running_time': 1234.0,
            u'sleep_streak': 0,
            u'started_ts': 1410990411.111,
        },
        'task_id': None,
        'ts': t1,
        'version': self.bot_version,
    }
    events = [e.to_dict() for e in bot_management.get_events_query('bot1')]
    self.assertEqual(events[0], expected_event)

    # BotInfo should be changed to quarantined state, too.
    bot_info = bot_management.get_info_key('bot1').get()
    self.assertTrue(bot_info.quarantined)
    # last valid dimensions should be kept in BotInfo.
    self.assertEqual(bot_info.dimensions_flat, [u'id:bot1', u'pool:default'])

  def test_poll_sleep(self):
    # A bot polls, gets nothing.
    params = self.do_handshake()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': False,
    }
    self.assertEqual(expected, response)

    # Sleep again
    params['state']['sleep_streak'] += 1
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': False,
    }
    self.assertEqual(expected, response)

  def test_poll_rbe(self):
    _, bot_auth_cfg = self.mock_bot_group_config(
        version='default',
        dimensions={u'pool': [u'default']},
        is_default=True)
    self.mock_pool_config(
        pool='default',
        rbe_migration=pools_pb2.Pool.RBEMigration(
            rbe_instance='some-instance',
            bot_mode_allocation=[
                {
                    'mode': 'RBE',
                    'percent': 100
                },
            ],
        ),
    )
    self.mock(rbe, 'generate_poll_token', mock.Mock())
    rbe.generate_poll_token.return_value = 'mocked-poll-token'
    self.mock(task_scheduler, 'exponential_backoff', lambda _: 1.23)

    expected_rbe_params = {
        u'instance': u'some-instance',
        u'hybrid_mode': False,
        u'sleep': 1.23,
        u'poll_token': u'mocked-poll-token',
    }

    # A bot notifies Swarming of its existence, gets RBE parameters.
    handshake_response = {}
    params = self.do_handshake(response_copy=handshake_response)
    self.assertEqual(expected_rbe_params, handshake_response['rbe'])

    # A bot polls, gets RBE parameters.
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'rbe',
        u'rbe': expected_rbe_params,
    }
    self.assertEqual(expected, response)

    # The poll token was generated with correct parameters.
    rbe.generate_poll_token.assert_called_with(
        bot_id='bot1',
        rbe_instance='some-instance',
        enforced_dimensions={u'pool': [u'default']},
        bot_auth_cfg=bot_auth_cfg)

    # The bot is idle, since it didn't report `rbe_idle`.
    bot_info = bot_management.get_info_key('bot1').get()
    self.assertEqual(bot_info.last_seen_ts, self.now)
    self.assertEqual(bot_info.idle_since_ts, self.now)

    self.now += datetime.timedelta(seconds=1)
    self.mock_now(self.now)

    # The bot is reporting that is it busy now.
    params['state']['rbe_idle'] = False
    self.post_json('/swarming/api/v1/bot/poll', params)

    # It is indeed marked as busy in datastore.
    bot_info = bot_management.get_info_key('bot1').get()
    self.assertEqual(bot_info.last_seen_ts, self.now)
    self.assertIsNone(bot_info.idle_since_ts)

  def test_poll_rbe_in_hybrid_mode(self):
    self.mock_bot_group_config(
        version='default',
        dimensions={u'pool': [u'default']},
        is_default=True)
    self.mock_pool_config(
        pool='default',
        rbe_migration=pools_pb2.Pool.RBEMigration(
            rbe_instance='some-instance',
            bot_mode_allocation=[
                {
                    'mode': 'HYBRID',
                    'percent': 100
                },
            ],
        ),
        realm='test:pool/default',
    )
    self.mock(rbe, 'generate_poll_token', mock.Mock())
    rbe.generate_poll_token.return_value = 'mocked-poll-token'

    # A bot polls and registers itself in Swarming scheduler.
    self.set_as_bot()
    params = self.do_handshake()
    params['force'] = False
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(response['cmd'], u'rbe')

    # Enqueue a native Swarming task. It should be accepted, there's a bot
    # for it.
    self.set_as_user()
    self.mock_auth_db([
        auth.Permission('swarming.pools.createTask'),
        auth.Permission('swarming.tasks.createInRealm'),
    ])
    _, task_id = self.client_create_task_raw(
        properties={u'relative_cwd': u'de/ep'}, realm='test:task_realm')
    task_id = task_id[:-1] + '1'

    # A bot polls again, but still only to register itself.
    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(response['cmd'], u'rbe')

    # A bot polls again asking to force-poll Swarming scheduler, gets the task.
    params['force'] = True
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(response['cmd'], u'run')
    self.assertEqual(response['manifest']['task_id'], task_id)

  def test_poll_update(self):
    params = self.do_handshake()
    old_version = params['version']
    params['version'] = 'badversion'
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'update',
        u'version': old_version,
    }
    self.assertEqual(expected, response)

  def test_poll_bot_group_config_change(self):
    params = self.do_handshake()
    params['state']['bot_group_cfg_version'] = 'badversion'
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'bot_restart',
        u'message': u'Restarting to pick up new bots.cfg config',
    }
    self.assertEqual(expected, response)

  def test_poll_bot_group_config_change_with_quarantined_flag(self):
    params = self.do_handshake()
    params['state']['bot_group_cfg_version'] = 'badversion'
    params['state']['quarantined'] = True

    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'bot_restart',
        u'message': u'Restarting to pick up new bots.cfg config',
    }
    self.assertEqual(expected, response)

  def test_poll_task_raw(self):
    # Successfully poll a task.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    # A bot polls, gets a task, updates it, completes it.
    params = self.do_handshake(do_first_poll=True)

    # Enqueue a task.
    self.set_as_user()
    _, task_id = self.client_create_task_raw(
        properties={u'relative_cwd': u'de/ep'})
    self.assertEqual('0', task_id[-1])
    # Convert TaskResultSummary reference to TaskRunResult.
    task_id = task_id[:-1] + '1'

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'run',
        u'manifest': {
            u'bot_id': u'bot1',
            u'bot_authenticated_as': u'bot:whitelisted-ip',
            u'caches': [],
            u'cipd_input': {
                u'client_package': {
                    u'package_name': u'infra/tools/cipd/${platform}',
                    u'path': None,
                    u'version': u'git_revision:deadbeef',
                },
                u'packages': [{
                    u'package_name': u'rm',
                    u'path': u'bin',
                    u'version': u'git_revision:deadbeef',
                }],
                u'server':
                u'https://pool.config.cipd.example.com',
            },
            u'command': [u'python', u'run_test.py'],
            u'containment': {
                u'containment_type': 2,
            },
            u'relative_cwd': u'de/ep',
            u'dimensions': {
                u'os': [u'Amiga'],
                u'pool': [u'default'],
            },
            u'env': {},
            u'env_prefixes': {},
            u'grace_period': 30,
            u'hard_timeout': 3600,
            u'host': u'http://localhost:8080',
            u'cas_input_root': None,
            u'secret_bytes': None,
            u'realm': {},
            u'resultdb': None,
            u'io_timeout': 1200,
            u'outputs': [u'foo', u'path/to/foobar'],
            u'service_accounts': {
                u'system': {
                    u'service_account': u'none'
                },
                u'task': {
                    u'service_account': u'none'
                },
            },
            u'task_id': task_id,
            u'bot_dimensions': {
                'id': ['bot1'],
                'os': ['Amiga'],
                'pool': ['default'],
            },
        },
    }
    self.assertEqual(expected, response)

    self.set_as_user()
    response = self.client_get_results(task_id)
    expected = self.gen_run_result(
        bot_idle_since_ts=fmtdate(self.now),
        created_ts=fmtdate(self.now),
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now))
    self.assertEqual(expected, response)

  def test_poll_task_with_bot_service_account(self):
    params = self.do_handshake(do_first_poll=True)

    self.set_as_user()
    _, task_id = self.client_create_task_raw(service_account='bot')
    self.assertEqual('0', task_id[-1])
    task_id = task_id[:-1] + '1'

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'run',
        u'manifest': {
            u'bot_id': u'bot1',
            u'bot_authenticated_as': u'bot:whitelisted-ip',
            u'caches': [],
            u'cipd_input': {
                u'client_package': {
                    u'package_name': u'infra/tools/cipd/${platform}',
                    u'path': None,
                    u'version': u'git_revision:deadbeef',
                },
                u'packages': [{
                    u'package_name': u'rm',
                    u'path': u'bin',
                    u'version': u'git_revision:deadbeef',
                }],
                u'server':
                u'https://pool.config.cipd.example.com',
            },
            u'command': [u'python', u'run_test.py'],
            u'containment': {
                u'containment_type': 2,
            },
            u'relative_cwd': None,
            u'dimensions': {
                u'os': [u'Amiga'],
                u'pool': [u'default'],
            },
            u'env': {},
            u'env_prefixes': {},
            u'grace_period': 30,
            u'hard_timeout': 3600,
            u'host': u'http://localhost:8080',
            u'cas_input_root': None,
            u'secret_bytes': None,
            u'realm': {},
            u'resultdb': None,
            u'io_timeout': 1200,
            u'outputs': [u'foo', u'path/to/foobar'],
            u'service_accounts': {
                u'system': {
                    u'service_account': u'none'
                },
                u'task': {
                    u'service_account': u'bot'
                },
            },
            u'task_id': task_id,
            u'bot_dimensions': {
                'id': ['bot1'],
                'os': ['Amiga'],
                'pool': ['default'],
            },
        },
    }
    self.assertEqual(expected, response)

  def test_poll_task_with_caches(self):
    params = self.do_handshake(do_first_poll=True)

    self.set_as_user()
    _, task_id = self.client_create_task_raw({
        'caches': [{
            'name': 'git_infra',
            'path': 'git_cache',
        }],
    })
    self.assertEqual('0', task_id[-1])
    task_id = task_id[:-1] + '1'

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'cmd': u'run',
        u'manifest': {
            u'bot_id':
            u'bot1',
            u'bot_authenticated_as':
            u'bot:whitelisted-ip',
            u'caches': [
                {
                    u'hint': '-1',
                    u'name': u'git_infra',
                    u'path': u'git_cache',
                },
            ],
            u'cipd_input': {
                u'client_package': {
                    u'package_name': u'infra/tools/cipd/${platform}',
                    u'path': None,
                    u'version': u'git_revision:deadbeef',
                },
                u'packages': [{
                    u'package_name': u'rm',
                    u'path': u'bin',
                    u'version': u'git_revision:deadbeef',
                }],
                u'server':
                u'https://pool.config.cipd.example.com',
            },
            u'command': [u'python', u'run_test.py'],
            u'containment': {
                u'containment_type': 2,
            },
            u'relative_cwd':
            None,
            u'dimensions': {
                u'os': [u'Amiga'],
                u'pool': [u'default'],
            },
            u'env': {},
            u'env_prefixes': {},
            u'grace_period':
            30,
            u'hard_timeout':
            3600,
            u'host':
            u'http://localhost:8080',
            u'cas_input_root':
            None,
            u'io_timeout':
            1200,
            u'outputs': [u'foo', u'path/to/foobar'],
            u'secret_bytes':
            None,
            u'realm': {},
            u'resultdb':
            None,
            u'service_accounts': {
                u'system': {
                    u'service_account': u'none'
                },
                u'task': {
                    u'service_account': u'none'
                },
            },
            u'task_id':
            task_id,
            u'bot_dimensions': {
                'id': ['bot1'],
                'os': ['Amiga'],
                'pool': ['default'],
            },
        },
    }
    self.assertEqual(expected, response)

  @ndb.tasklet
  def _mock_create_invocation_async(self, _task_run_id, _realm, _deadline):
    raise ndb.Return('resultdb-update-token')

  def test_poll_with_resultdb(self):
    params = self.do_handshake(do_first_poll=True)

    self.set_as_user()
    self.mock(auth, 'has_permission', lambda *_args, **_kwargs: True)
    with mock.patch(
        'server.resultdb.create_invocation_async',
        mock.Mock(
            side_effect=self._mock_create_invocation_async)) as mocked_call:
      response, _ = self.client_create_task_raw(
          resultdb={'enable': True}, realm='infra:try')

      invocation_name = response['task_result']['resultdb_info']['invocation']

      mocked_call.assert_called_once()

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'hostname': u'test-resultdb-server.com',
        u'current_invocation': {
            u'name': invocation_name,
            u'update_token': u'resultdb-update-token',
        }
    }
    self.assertEqual(expected, response['manifest']['resultdb'])

  def test_poll_with_realm(self):
    params = self.do_handshake(do_first_poll=True)

    self.mock(auth, 'has_permission', lambda *_args, **_kwargs: True)
    self.mock(service_accounts, 'has_token_server', lambda: True)

    self.set_as_user()
    response, _ = self.client_create_task_raw(realm='test:task_realm')

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        u'name': u'test:task_realm',
    }
    self.assertEqual(expected, response['manifest']['realm'])

  def test_poll_with_cas_input_root(self):
    # fix task_id.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    params = self.do_handshake(do_first_poll=True)

    self.mock(auth, 'has_permission', lambda *_args, **_kwargs: True)
    self.mock(service_accounts, 'has_token_server', lambda: True)

    self.set_as_user()
    _, task_id = self.client_create_task_cas_input_root()
    task_id = task_id[:-1] + '1'  # convert to run_id.

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)

    expected = {
        u'cmd': u'run',
        u'manifest': {
            u'bot_id': u'bot1',
            u'bot_authenticated_as': u'bot:whitelisted-ip',
            u'caches': [],
            u'cipd_input': {
                u'client_package': {
                    u'package_name': u'infra/tools/cipd/${platform}',
                    u'path': None,
                    u'version': u'git_revision:deadbeef',
                },
                u'packages': [{
                    u'package_name': u'rm',
                    u'path': u'bin',
                    u'version': u'git_revision:deadbeef',
                }],
                u'server':
                u'https://pool.config.cipd.example.com',
            },
            u'command': [u'python', u'run_test.py'],
            u'containment': {
                u'containment_type': 2,
            },
            u'relative_cwd': None,
            u'dimensions': {
                u'os': [u'Amiga'],
                u'pool': [u'default'],
            },
            u'env': {},
            u'env_prefixes': {},
            u'grace_period': 30,
            u'hard_timeout': 3600,
            u'host': u'http://localhost:8080',
            u'cas_input_root': {
                u'cas_instance': u'projects/test/instances/default',
                u'digest': {
                    u'hash': u'12345',
                    u'size_bytes': 1,
                }
            },
            u'secret_bytes': None,
            u'realm': {},
            u'resultdb': None,
            u'io_timeout': 1200,
            u'outputs': [u'foo', u'path/to/foobar'],
            u'service_accounts': {
                u'system': {
                    u'service_account': u'none'
                },
                u'task': {
                    u'service_account': u'none'
                },
            },
            u'task_id': task_id,
            u'bot_dimensions': {
                'id': ['bot1'],
                'os': ['Amiga'],
                'pool': ['default'],
            },
        },
    }
    self.assertEqual(expected, response)

    self.set_as_user()
    response = self.client_get_results(task_id)
    expected = self.gen_run_result(
        bot_idle_since_ts=fmtdate(self.now),
        created_ts=fmtdate(self.now),
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now))
    self.assertEqual(expected, response)

  def test_poll_conflicting_dimensions(self):
    params = self.do_handshake()
    self.assertEqual(params['dimensions']['pool'], ['default'])

    self.mock_bot_group_config(
        version='default',
        auth=(bot_groups_config.BotAuth(
            log_if_failed=False,
            require_luci_machine_token=False,
            require_service_account=None,
            require_gce_vm_token=None,
            ip_whitelist=None,
        ),),
        dimensions={u'pool': [u'server-side']},
        is_default=True)

    # Bot sends 'default' pool, but server config defined it as 'server-side'.
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': False,
    }
    self.assertEqual(expected, response)

  def test_poll_extra_bot_config(self):
    self.mock_bot_group_config(
        version='default',
        auth=(bot_groups_config.BotAuth(
            log_if_failed=False,
            require_luci_machine_token=False,
            require_service_account=None,
            require_gce_vm_token=None,
            ip_whitelist=None,
        ),),
        dimensions={},
        bot_config_script='foo.py',
        bot_config_script_rev='abcd',
        bot_config_script_content='print("Hi");import sys; sys.exit(1)',
        logs_cloud_project='chrome-infra-logs',
        is_default=True)
    params = self.do_handshake()
    self.assertEqual(u'print("Hi");import sys; sys.exit(1)',
                     params['bot_config'])

  def test_poll_with_external_scheduler(self):
    # Inject external scheduler config.
    es_cfg = pools_config.ExternalSchedulerConfig(
        address='qscheduler-test.example.com',
        id='all',
        dimensions=[],
        all_dimensions=[],
        any_dimensions=[],
        enabled=True,
        allow_es_fallback=False)
    mock.patch('server.external_scheduler.config_for_bot').start(
    ).return_value = es_cfg
    mock.patch('server.external_scheduler.config_for_task').start(
    ).return_value = es_cfg
    # Inject prpc client.
    es_client_mock = mock.Mock()
    mock.patch('server.external_scheduler._get_client').start(
    ).return_value = es_client_mock

    # Register bot.
    params = self.do_handshake()

    # Enqueue a task.
    self.set_as_user()
    resp, task_id = self.client_create_task_raw()
    task = resp['request']
    task_dimensions_flat = [
        '%s:%s' % (d['key'], d['value'])
        for d in task['task_slices'][0]['properties']['dimensions']
    ]

    run_id = task_id[:-1] + '1'
    self.set_as_bot()

    bot_dimensions_flat = task_queues.bot_dimensions_to_flat(
        params['dimensions'])

    # First, no task assigned.
    es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    expected = {
        'duration': mock.ANY,
        'cmd': 'sleep',
        'quarantined': False,
    }
    self.assertEqual(expected, response)

    # Assert prpc calls.
    es_client_mock.AssignTasks.assert_called_once()
    es_client_mock.NotifyTasks.assert_called_once()
    assign_req = es_client_mock.AssignTasks.call_args[0][0]
    self.assertEqual(
        assign_req,
        plugin_pb2.AssignTasksRequest(
            scheduler_id=u'all',
            idle_bots=[
                plugin_pb2.IdleBot(
                    bot_id=u'bot1', dimensions=bot_dimensions_flat),
            ],
            time=assign_req.time,
        ))
    notify_req = es_client_mock.NotifyTasks.call_args[0][0]
    self.assertEqual(
        notify_req,
        plugin_pb2.NotifyTasksRequest(
            scheduler_id=u'all',
            notifications=[
                plugin_pb2.NotifyTasksItem(
                    task=plugin_pb2.TaskSpec(
                        id=task_id,
                        tags=task['tags'],
                        slices=[
                            plugin_pb2.SliceSpec(
                                dimensions=task_dimensions_flat),
                        ],
                        state=u'PENDING',
                        enqueued_time=notify_req.notifications[0].time,
                    ),
                    time=notify_req.notifications[0].time,
                ),
            ],
        ))
    es_client_mock.reset_mock()

    # Second, one task assigned.
    es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse(
        assignments=[
            plugin_pb2.TaskAssignment(
                bot_id=u'bot1', task_id=task_id, slice_number=0),
        ])
    es_client_mock.NotifyTasks.return_value = plugin_pb2.NotifyTasksResponse()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(response['cmd'], u'run')
    self.assertEqual(response['manifest']['task_id'], run_id)

    # Assert prpc calls.
    es_client_mock.AssignTasks.assert_called_once()
    es_client_mock.NotifyTasks.assert_called_once()
    notify_req = es_client_mock.NotifyTasks.call_args[0][0]
    self.assertEqual(
        notify_req,
        plugin_pb2.NotifyTasksRequest(
            scheduler_id=u'all',
            notifications=[
                plugin_pb2.NotifyTasksItem(
                    task=plugin_pb2.TaskSpec(
                        id=task_id,
                        tags=task['tags'],
                        slices=[
                            plugin_pb2.SliceSpec(
                                dimensions=task_dimensions_flat),
                        ],
                        state=u'RUNNING',
                        bot_id=u'bot1',
                        enqueued_time=notify_req.notifications[0].time,
                    ),
                    time=notify_req.notifications[0].time,
                ),
            ],
        ))
    es_client_mock.reset_mock()

    # Enqueue a task that has a parent task.
    self.set_as_user()
    _, child_task_id = self.client_create_task_raw(parent_task_id=run_id)
    child_run_id = child_task_id[:-1] + '1'
    self.set_as_bot()
    es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse(
        assignments=[
            plugin_pb2.TaskAssignment(
                bot_id=u'bot1', task_id=child_task_id, slice_number=0),
        ])
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(response['cmd'], u'run')
    self.assertEqual(response['manifest']['task_id'], child_run_id)

    # Enqueue a task that has a parent task and a grand parent task.
    self.set_as_user()
    _, grand_child_task_id = self.client_create_task_raw(
        parent_task_id=child_run_id)
    grand_child_run_id = grand_child_task_id[:-1] + '1'
    self.set_as_bot()
    es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse(
        assignments=[
            plugin_pb2.TaskAssignment(
                bot_id=u'bot1', task_id=grand_child_task_id, slice_number=0),
        ])
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(response['cmd'], u'run')
    self.assertEqual(response['manifest']['task_id'], grand_child_run_id)

    # TODO(crbug.com/1131822): add a test case for _ensure_active_slice() with
    # a task has multiple slices.

  def test_poll_idempotent(self):
    params = self.do_handshake(do_first_poll=True)

    self.set_as_user()
    self.mock(auth, 'has_permission', lambda *_args, **_kwargs: True)

    params['request_uuid'] = 'cf60878f-8f2a-4f1e-b1f5-8b5ec88813a9'
    task_res, _ = self.client_create_task_raw()
    run_id = task_res['task_id'][:-1] + '1'

    self.set_as_bot()
    res1 = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(res1['cmd'], 'run')
    self.assertEqual(res1['manifest']['task_id'], run_id)

    res2 = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(res1, res2)

  def test_claim_run(self):
    self.mock_pool_config(
        pool='default',
        rbe_migration=pools_pb2.Pool.RBEMigration(
            rbe_instance='some-instance',
            rbe_mode_percent=100,
        ),
    )
    self.mock(realms, 'check_tasks_create_in_realm', lambda *_: True)
    self.mock(realms, 'check_pools_create_task', lambda *_: True)
    self.mock(realms, 'check_tasks_act_as', lambda *_: True)

    to_run = []

    def mocked_rbe_enqueue(_request, task_to_run):
      to_run.append(task_to_run)

    self.mock(rbe, 'enqueue_rbe_task', mocked_rbe_enqueue)

    self.set_as_user()
    _, task_id = self.client_create_task_raw()
    run_result_id = task_id[:-1] + '1'
    self.assertEqual(len(to_run), 1)

    self.set_as_bot()
    params = self.do_handshake()
    params[u'claim_id'] = 'some-claim-id'
    params[u'task_id'] = task_id
    params[u'task_to_run_shard'] = to_run[0].shard_index
    params[u'task_to_run_id'] = to_run[0].key.id()
    response = self.post_json('/swarming/api/v1/bot/claim', params)
    self.assertEqual(
        response, {
            u'cmd': u'run',
            u'manifest': {
                u'bot_authenticated_as': u'bot:whitelisted-ip',
                u'bot_dimensions': {
                    u'id': [u'bot1'],
                    u'os': [u'Amiga'],
                    u'pool': [u'default']
                },
                u'bot_id': u'bot1',
                u'caches': [],
                u'cas_input_root': None,
                u'cipd_input': {
                    u'client_package': {
                        u'package_name': u'infra/tools/cipd/${platform}',
                        u'path': None,
                        u'version': u'git_revision:deadbeef'
                    },
                    u'packages': [{
                        u'package_name': u'rm',
                        u'path': u'bin',
                        u'version': u'git_revision:deadbeef'
                    }],
                    u'server':
                    u'https://pool.config.cipd.example.com'
                },
                u'command': [u'python', u'run_test.py'],
                u'containment': {
                    u'containment_type': 2
                },
                u'dimensions': {
                    u'os': [u'Amiga'],
                    u'pool': [u'default']
                },
                u'env': {},
                u'env_prefixes': {},
                u'grace_period': 30,
                u'hard_timeout': 3600,
                u'host': u'http://localhost:8080',
                u'io_timeout': 1200,
                u'outputs': [u'foo', u'path/to/foobar'],
                u'realm': {},
                u'relative_cwd': None,
                u'resultdb': None,
                u'secret_bytes': None,
                u'service_accounts': {
                    u'system': {
                        u'service_account': u'none'
                    },
                    u'task': {
                        u'service_account': u'none'
                    }
                },
                u'task_id': run_result_id,
            },
        })

    # Submitted the bot event.
    events = list(bot_management.get_events_query('bot1'))
    self.assertEqual(events[-1].event_type, 'request_task')
    self.assertEqual(events[-1].task_id, run_result_id)

  def test_claim_skip(self):
    params = self.do_handshake()
    params[u'claim_id'] = 'some-claim-id'
    params[u'task_id'] = '5cee400000010'  # doesn't exist
    params[u'task_to_run_shard'] = 1
    params[u'task_to_run_id'] = 1
    response = self.post_json('/swarming/api/v1/bot/claim', params)
    self.assertEqual(response, {
        u'cmd': u'skip',
        u'reason': u'No task slice',
    })

  def test_claim_bad_task_to_run(self):
    params = self.do_handshake()
    params[u'claim_id'] = 'some-claim-id'
    params[u'task_id'] = '?????'
    params[u'task_to_run_shard'] = 1
    params[u'task_to_run_id'] = 1
    response = self.app.post_json('/swarming/api/v1/bot/claim',
                                  params,
                                  expect_errors=True)
    self.assertEqual(response.status_code, 400)

  def test_complete_task_cas_output_root(self):
    # Successfully poll a task.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    # A bot polls, gets a task, updates it, completes it.
    params = self.do_handshake(do_first_poll=True)
    # Enqueue a task.
    self.set_as_user()
    _, task_id = self.client_create_task_cas_input_root()
    self.assertEqual('0', task_id[-1])

    # Convert TaskResultSummary reference to TaskRunResult.
    task_id = task_id[:-1] + '1'
    self.set_as_bot()
    self.post_json('/swarming/api/v1/bot/poll', params)

    # Complete the task.
    params = {
        'cost_usd': 0.1,
        'duration': 3.,
        'bot_overhead': 0.1,
        'exit_code': 0,
        'id': 'bot1',
        'cache_trim_stats': {
            'duration': 0.1,
        },
        'cipd_stats': {
            'duration': 0.1,
        },
        'named_caches_stats': {
            'install': {
                'duration': 0.1,
            },
            'uninstall': {
                'duration': 0.1,
            },
        },
        'isolated_stats': {
            'download': {
                'duration': 0.1,
                'initial_number_items': 10,
                'initial_size': 1000,
                'items_cold': '',
                'items_hot': '',
            },
            'upload': {
                'duration': 0.1,
                'items_cold': '',
                'items_hot': '',
            },
        },
        'cleanup_stats': {
            'duration': 0.1,
        },
        'output': base64.b64encode('Ahahah'),
        'output_chunk_start': 0,
        'cas_output_root': {
            'cas_instance': 'projects/test/instances/default',
            'digest': {
                'hash': '12345',
                'size_bytes': 1,
            }
        },
        'cipd_pins': {
            'client_package': {
                'package_name': 'infra/tools/cipd/windows-amd64',
                'version': 'deadbeef' * 5,
            },
            'packages': [{
                'package_name': 'rm',
                'path': 'bin',
                'version': 'badc0fee' * 5,
            }]
        },
        'task_id': task_id,
    }
    response = self.post_json('/swarming/api/v1/bot/task_update', params)
    self.assertEqual({u'must_stop': False, u'ok': True}, response)

    self.set_as_user()
    response = self.client_get_results(task_id, include_performance_stats=True)
    expected = self.gen_run_result(
        bot_idle_since_ts=fmtdate(self.now),
        cas_output_root={
            'cas_instance': 'projects/test/instances/default',
            'digest': {
                'hash': '12345',
                'size_bytes': '1',
            }
        },
        cipd_pins={
            'client_package': {
                'package_name': 'infra/tools/cipd/windows-amd64',
                'version': 'deadbeef' * 5,
            },
            'packages': [{
                'package_name': 'rm',
                'path': 'bin',
                'version': 'badc0fee' * 5,
            }]
        },
        completed_ts=fmtdate(self.now),
        costs_usd=[0.1],
        created_ts=fmtdate(self.now),
        duration=3.0,
        performance_stats={
            u'bot_overhead': 0.1,
            u'cache_trim': {
                u'duration': 0.1,
            },
            u'package_installation': {
                u'duration': 0.1,
            },
            u'named_caches_install': {
                u'duration': 0.1,
            },
            u'named_caches_uninstall': {
                u'duration': 0.1,
            },
            u'isolated_download': {
                u'duration': 0.1,
                u'initial_number_items': u'10',
                u'initial_size': u'1000',
            },
            u'isolated_upload': {
                u'duration': 0.1,
            },
            u'cleanup': {
                u'duration': 0.1,
            },
        },
        exit_code=u'0',
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now),
        state=u'COMPLETED')
    self.assertEqual(expected, response)

  def test_bot_ereporter2_error(self):
    # ereporter2's //client/utils/on_error.py traps unhandled exceptions
    # automatically.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    errors = []
    self.mock(
        ereporter2,
        'log_request', lambda *args, **kwargs: errors.append((args, kwargs)))
    params = self.do_handshake()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': False,
    }
    self.assertEqual(expected, response)

    # The bot fails somehow.
    error_params = {
        'v': 1,
        'r': {
            'args': ['a', 'b'],
            'category': 'junk',
            'cwd': '/root',
            'duration': 0.1,
            'endpoint': '/root',
            'env': {
                'a': 'b'
            },
            'exception_type': 'FooError',
            'hostname': 'localhost',
            'message': 'Something happened',
            'method': 'GET',
            'os': 'Amiga',
            'params': {
                'a': 123
            },
            'python_version': '2.7',
            'request_id': '123',
            'source': params['dimensions']['id'][0],
            'source_ip': '127.0.0.1',
            'stack': 'stack trace...',
            'user': 'Joe',
            'version': '12',
        },
    }
    ereporter2_app = webtest.TestApp(
        webapp2.WSGIApplication(ereporter2.get_frontend_routes(), debug=True),
        extra_environ={
            'REMOTE_ADDR': self.source_ip,
            'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
        })
    response = ereporter2_app.post_json('/ereporter2/api/v1/on_error',
                                        error_params)
    self.assertTrue('id' in response.json)
    error_id = response.json['id']
    expected = {
        u'id': error_id,
        u'url': u'http://localhost/restricted/ereporter2/errors/%s' % error_id,
    }
    self.assertEqual(expected, response.json)

    # A bot error currently does not result in permanent quarantine. It will
    # eventually.
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': False,
    }
    self.assertEqual(expected, response)
    self.assertEqual([], errors)

  def test_bot_event_error(self):
    # Native bot error reporting.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    errors = []
    self.mock(
        ereporter2,
        'log_request', lambda *args, **kwargs: errors.append((args, kwargs)))
    params = self.do_handshake()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': False,
    }
    self.assertEqual(expected, response)

    # The bot fails somehow.
    params2 = self.do_handshake()
    params2['event'] = 'bot_error'
    params2['message'] = 'for the worst'
    response = self.post_json('/swarming/api/v1/bot/event', params2)
    self.assertEqual({}, response)

    # A bot error currently does not result in permanent quarantine. It will
    # eventually.
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertTrue(response.pop(u'duration'))
    expected = {
        u'cmd': u'sleep',
        u'quarantined': False,
    }
    self.assertEqual(expected, response)
    expected = [
        {
            'message': u'for the worst\n\n'
                       'https://test-swarming.appspot.com/restricted/bot/bot1',
            'source': 'bot',
        },
    ]
    self.assertEqual(expected, [e[1] for e in errors])

  def test_bot_event(self):
    self.mock(random, 'getrandbits', lambda _: 0x88)
    ticker = test_case.Ticker(self.now)
    ticks = []
    ticks.append(self.mock_now(ticker()))
    params = self.do_handshake()
    dimensions = params['dimensions']
    for e in handlers_bot.BotEventHandler.ALLOWED_EVENTS:
      if e == 'bot_error':
        # This one is tested specifically since it also logs an error message.
        continue
      params['event'] = e
      params['message'] = 'for the best'
      ticks.append(self.mock_now(ticker()))
      response = self.post_json('/swarming/api/v1/bot/event', params)
      self.assertEqual({}, response)

    # TODO(maruel): Replace with client api to query last BotEvent.
    actual = [e.to_dict() for e in bot_management.get_events_query('bot1')]
    events_to_check = reversed([
        event for event in handlers_bot.BotEventHandler.ALLOWED_EVENTS
        if event != "bot_error"
    ])
    expected = [
        {
            'authenticated_as': u'bot:whitelisted-ip',
            'dimensions': dimensions,
            'event_type': unicode(event),
            'external_ip': u'192.168.2.2',
            'idle_since_ts': None,
            'last_seen_ts': ticks[-(idx + 1)],  # see `ts` below
            'lease_id': None,
            'lease_expiration_ts': None,
            'leased_indefinitely': None,
            'machine_type': None,
            'machine_lease': None,
            'message': u'for the best',
            'quarantined': False,
            'maintenance_msg': None,
            'state': {
                u'bot_group_cfg_version': u'default',
                u'running_time': 1234.0,
                u'sleep_streak': 0,
                u'started_ts': 1410990411.111,
            },
            'task_id': None,
            # explanation of this index:
            # ticks[0] is the timestamp for the bot_connected event so there
            # will be len(ALLOWED_EVENTS)+1 in the ticks list.
            # -(idx+1) is the same as reverse(ticks)[idx+1]
            'ts': ticks[-(idx + 1)],
            'version': self.bot_version,
        } for idx, event in enumerate(events_to_check)
    ]
    expected.append({
        'authenticated_as': u'bot:whitelisted-ip',
        'dimensions': dimensions,
        'event_type': u'bot_connected',
        'external_ip': u'192.168.2.2',
        'idle_since_ts': None,
        'last_seen_ts': ticks[0],
        'lease_id': None,
        'lease_expiration_ts': None,
        'leased_indefinitely': None,
        'machine_type': None,
        'machine_lease': None,
        'message': None,
        'quarantined': False,
        'maintenance_msg': None,
        'state': {
            u'handshaking': True,
            u'running_time': 1234.0,
            u'sleep_streak': 0,
            u'started_ts': 1410990411.111,
        },
        'task_id': None,
        'ts': ticks[0],
        'version': u'123',
    })

    self.assertEqual(expected, actual)

  def test_bot_event_bad_request(self):
    params = self.do_handshake()
    params['event'] = 'bot_error'
    params['message'] = 'I have an invalid maintenance message.'
    params['state']['maintenance'] = True  # non-string maintenance message.
    self.post_json('/swarming/api/v1/bot/event', params, status=400)

  def test_task_complete(self):
    # Runs a task up to completion.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    params = self.do_handshake(do_first_poll=True)
    self.set_as_user()
    self.client_create_task_raw(
        properties=dict(command=['python', 'runtest.py']))

    def _params(**kwargs):
      out = {
          'cost_usd': 0.1,
          'duration': None,
          'exit_code': None,
          'id': 'bot1',
          'output': None,
          'output_chunk_start': 0,
          'task_id': task_id,
      }
      out.update(**kwargs)
      return out

    def _cycle(params, expected, must_stop):
      """Cycles between bot update and user retrieving results."""
      self.set_as_bot()
      response = self.post_json('/swarming/api/v1/bot/task_update', params)
      self.assertEqual({u'must_stop': must_stop, u'ok': True}, response)
      self.set_as_user()
      self.assertEqual(expected, self.client_get_results(task_id))

    # 1. Initial task update with no data.
    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    task_id = response['manifest']['task_id']
    params = _params()
    response = self.post_json('/swarming/api/v1/bot/task_update', params)
    self.assertEqual({u'must_stop': False, u'ok': True}, response)

    self.set_as_user()
    response = self.client_get_results(task_id)
    expected = self.gen_run_result(
        bot_idle_since_ts=fmtdate(self.now),
        costs_usd=[0.1],
        created_ts=fmtdate(self.now),
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now))
    self.assertEqual(expected, response)

    # 2. Task update with some output.
    params = _params(output=base64.b64encode('Oh '))
    self.assertEqual(expected, response)
    _cycle(params, expected, False)

    # 3. Task update with some more output.
    params = _params(output=base64.b64encode('hi'), output_chunk_start=3)
    _cycle(params, expected, False)

    # 4. Task update with completion of the command.
    params = _params(
        duration=0.1, exit_code=23, output=base64.b64encode('Ahahah'))
    expected = self.gen_run_result(
        bot_idle_since_ts=fmtdate(self.now),
        completed_ts=fmtdate(self.now),
        costs_usd=[0.1],
        created_ts=fmtdate(self.now),
        duration=0.1,
        exit_code=u'23',
        failure=True,
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now),
        state=u'COMPLETED')
    _cycle(params, expected, False)

  def test_task_update_db_failure(self):
    # The error is caught in task_scheduler.bot_update_task().
    self.set_as_bot()
    self.bot_poll()

    self.set_as_user()
    self.client_create_task_raw(
        properties=dict(command=['python', 'runtest.py']))

    self.set_as_bot()
    params = self.do_handshake()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    task_id = response['manifest']['task_id']

    def r(*_):
      raise datastore_errors.Timeout('Sorry!')

    self.mock(ndb, 'put_multi', r)
    params = {
        'cost_usd': 0.1,
        'duration': 0.1,
        'exit_code': 0,
        'id': 'bot1',
        'output': base64.b64encode('result string'),
        'output_chunk_start': 0,
        'task_id': task_id,
    }
    response = self.post_json(
        '/swarming/api/v1/bot/task_update', params, status=500)
    self.assertEqual({u'error': u'Failed to update, please retry'}, response)

  def test_task_update_failure(self):
    self.set_as_bot()
    self.bot_poll()

    self.set_as_user()
    self.client_create_task_raw(
        properties=dict(command=['python', 'runtest.py']))

    self.set_as_bot()
    params = self.do_handshake()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    task_id = response['manifest']['task_id']

    class NewError(Exception):
      pass

    def r(*_):
      raise NewError('Sorry!')

    self.mock(ndb, 'put_multi', r)
    params = {
        'bot_overhead': 0.,
        'cost_usd': 0.1,
        'duration': 0.1,
        'exit_code': 0,
        'id': 'bot1',
        'isolated_stats': {
            'download': {},
        },
        'output': base64.b64encode('result string'),
        'output_chunk_start': 0,
        'task_id': task_id,
    }
    response = self.post_json(
        '/swarming/api/v1/bot/task_update', params, status=500)
    self.assertEqual({u'error': u'Sorry!'}, response)

  def test_task_failure(self):
    self.mock(random, 'getrandbits', lambda _: 0x88)
    params = self.do_handshake(do_first_poll=True)

    self.set_as_user()
    self.client_create_task_raw()

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    task_id = response['manifest']['task_id']

    params = {
        'cost_usd': 0.1,
        'duration': 0.1,
        'exit_code': 1,
        'id': 'bot1',
        'output': base64.b64encode('result string'),
        'output_chunk_start': 0,
        'task_id': task_id,
    }
    self.post_json('/swarming/api/v1/bot/task_update', params)

    self.set_as_user()
    response = self.client_get_results(task_id)
    expected = self.gen_run_result(
        bot_idle_since_ts=fmtdate(self.now),
        completed_ts=fmtdate(self.now),
        costs_usd=[0.1],
        created_ts=fmtdate(self.now),
        duration=0.1,
        exit_code=u'1',
        failure=True,
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now),
        state=u'COMPLETED')
    self.assertEqual(expected, response)

  def test_task_internal_failure(self):
    # E.g. task_runner blew up.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    errors = []
    self.mock(
        ereporter2,
        'log_request', lambda *args, **kwargs: errors.append((args, kwargs)))
    params = self.do_handshake(do_first_poll=True)

    self.set_as_user()
    self.client_create_task_raw()

    self.set_as_bot()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    task_id = response['manifest']['task_id']

    # Let's say it failed to start task_runner because the new bot code is
    # broken. The end result is still BOT_DIED. The big change is that it
    # doesn't need to wait for a cron job to set this status.
    params = {
        'id': params['dimensions']['id'][0],
        'message': 'Oh',
        'task_id': task_id,
        'client_error': {
            'missing_cas': None,
            'missing_cipd': [],
        },
    }
    self.post_json('/swarming/api/v1/bot/task_error', params)

    self.set_as_user()
    response = self.client_get_results(task_id)
    expected = self.gen_run_result(
        bot_idle_since_ts=fmtdate(self.now),
        abandoned_ts=fmtdate(self.now),
        completed_ts=fmtdate(self.now),
        created_ts=fmtdate(self.now),
        internal_failure=True,
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now),
        state=u'BOT_DIED')
    self.assertEqual(expected, response)
    self.assertEqual(1, len(errors))
    expected = [
        {
            'message':
                u'Bot: https://test-swarming.appspot.com/restricted/bot/bot1\n'
                'Task failed: '
                'https://test-swarming.appspot.com/user/task/5cee488008811\nOh',
            'source': 'bot',
        },
    ]
    self.assertEqual(expected, [e[1] for e in errors])

  def test_task_canceled(self):
    # Task was canceled while running, resulting in KILLED.
    self.mock(random, 'getrandbits', lambda _: 0x88)
    params = self.do_handshake(do_first_poll=True)

    self.set_as_user()
    self.client_create_task_raw()

    self.set_as_bot()
    params = self.do_handshake()
    response = self.post_json('/swarming/api/v1/bot/poll', params)
    self.assertEqual(u'run', response.get(u'cmd'))
    task_id = response['manifest']['task_id']

    # Cancel the task while it's running.
    self.set_as_user()
    response = self.client_cancel_task(task_id, True)
    self.assertEqual({u'ok': True, u'was_running': True}, response)

    # Now tagged with abandonned_ts, but state not yet updated.
    expected = self.gen_run_result(
        abandoned_ts=fmtdate(self.now),
        bot_idle_since_ts=fmtdate(self.now),
        created_ts=fmtdate(self.now),
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now))
    response = self.client_get_results(task_id)
    self.assertEqual(expected, response)

    self.set_as_bot()
    response = self.bot_complete_task(task_id=task_id)
    self.assertEqual({u'must_stop': True, u'ok': True}, response)

    self.set_as_user()
    expected = self.gen_run_result(
        abandoned_ts=fmtdate(self.now),
        bot_idle_since_ts=fmtdate(self.now),
        completed_ts=fmtdate(self.now),
        costs_usd=[0.1],
        created_ts=fmtdate(self.now),
        duration=0.1,
        exit_code=u'0',
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now),
        state=u'KILLED')
    response = self.client_get_results(task_id)
    self.assertEqual(expected, response)

    # Canceling again is denied.
    response = self.client_cancel_task(task_id, False)
    self.assertEqual({u'ok': False, u'was_running': False}, response)

    expected = self.gen_run_result(
        abandoned_ts=fmtdate(self.now),
        bot_idle_since_ts=fmtdate(self.now),
        completed_ts=fmtdate(self.now),
        costs_usd=[0.1],
        created_ts=fmtdate(self.now),
        duration=0.1,
        exit_code=u'0',
        modified_ts=fmtdate(self.now),
        started_ts=fmtdate(self.now),
        state=u'KILLED')
    response = self.client_get_results(task_id)
    self.assertEqual(expected, response)

  def test_bot_code_as_bot(self):
    self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
    code = self.app.get('/swarming/api/v1/bot/bot_code/' + '0' * 64)
    expected = {'config/bot_config.py',
                'config/config.json'}.union(bot_archive.FILES)
    with zipfile.ZipFile(StringIO.StringIO(code.body), 'r') as z:
      self.assertEqual(expected, set(z.namelist()))

  def test_bot_code_as_bot_query_string(self):
    self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
    self.app.get(
        '/swarming/api/v1/bot/bot_code/' + '0' * 64 + '?bot_id=1', status=302)

  def test_bot_code_without_token(self):
    self.set_as_anonymous()
    self.app.get('/bot_code', status=403)

  def test_bot_code_with_token(self):
    self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
    self.set_as_anonymous()
    tok = bot_code.generate_bootstrap_token()
    self.app.get('/bot_code?tok=%s' % tok, status=302)

  def test_bot_code_redirect(self):
    self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
    response = self.app.get('/bot_code')
    self.assertEqual(response.status_int, 302)  # Found
    self.assertEqual(
        response.location,
        'http://localhost/swarming/api/v1/bot/bot_code/' + '0' * 64)

  def test_bot_code_wrong_version(self):
    self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
    response = self.app.get(
        '/swarming/api/v1/bot/bot_code/' + '1' * 64,
        headers={'X-Luci-Swarming-Bot-ID': 'bot1'})
    self.assertEqual(response.status_int, 302)  # Found
    self.assertEqual(response.location, 'http://localhost/bot_code?bot_id=bot1')

  def test_bot_code_wrong_version_without_bot_id(self):
    self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None))
    response = self.app.get('/swarming/api/v1/bot/bot_code/' + '1' * 64)
    self.assertEqual(response.status_int, 302)  # Found
    self.assertEqual(response.location, 'http://localhost/bot_code')

  FAKE_BOT_ID = 'bot1'
  FAKE_SCOPES = ['scope_a', 'scope_b']
  FAKE_AUDIENCE = 'https://example.com'

  def gen_token_request(self, endpoint, **kwargs):
    out = {'account_id': 'system', 'id': self.FAKE_BOT_ID}
    if endpoint == 'oauth_token':
      out['scopes'] = self.FAKE_SCOPES
    elif endpoint == 'id_token':
      out['audience'] = self.FAKE_AUDIENCE
    else:
      self.fail('Invalid token endpoint %r' % (endpoint,))
    out.update(kwargs)
    return out

  def test_oauth_token_bad_scopes(self):
    self.set_as_bot()
    response = self.app.post_json(
        '/swarming/api/v1/bot/oauth_token',
        params=self.gen_token_request('oauth_token', scopes='not a list'),
        expect_errors=True)
    self.assertEqual(400, response.status_code)
    self.assertEqual({u'error': u'"scopes" must be a list of strings'},
                     response.json)

  def test_id_token_bad_audience(self):
    self.set_as_bot()
    response = self.app.post_json(
        '/swarming/api/v1/bot/id_token',
        params=self.gen_token_request('id_token', audience=123),
        expect_errors=True)
    self.assertEqual(400, response.status_code)
    self.assertEqual({u'error': u'"audience" must be a string'},
                     response.json)

  @parameterized.expand(['oauth_token', 'id_token'])
  def test_token_unknown_account_id(self, endpoint):
    self.set_as_bot()
    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(endpoint, account_id='blah'),
        expect_errors=True)
    self.assertEqual(400, response.status_code)
    self.assertEqual(
        {u'error': u'Unknown "account_id", expecting "task" or "system"'},
        response.json)

  @parameterized.expand(['oauth_token', 'id_token'])
  def test_token_task_account_without_task_id(self, endpoint):
    self.set_as_bot()
    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(endpoint, account_id='task'),
        expect_errors=True)
    self.assertEqual(400, response.status_code)
    self.assertEqual(
        {u'error': u'"task_id" is required when using "account_id" == "task"'},
        response.json)

  @parameterized.expand([
      (
          'oauth_token',
          'access_token',
          service_accounts.TOKEN_KIND_ACCESS_TOKEN,
          FAKE_SCOPES,
          None,
      ),
      (
          'id_token',
          'id_token',
          service_accounts.TOKEN_KIND_ID_TOKEN,
          None,
          FAKE_AUDIENCE,
      ),
  ])
  def test_token_task_account(self, endpoint, response_key,
                              expected_kind, expected_scopes,
                              expected_audience):
    calls = []

    def mocked(task_id, bot_id, kind, scopes=None, audience=None):
      calls.append(task_id)
      self.assertEqual(self.FAKE_BOT_ID, bot_id)
      self.assertEqual(expected_kind, kind)
      self.assertEqual(expected_scopes, scopes)
      self.assertEqual(expected_audience, audience)
      return 'blah@example.com', service_accounts.AccessToken('blah', 126240504)

    self.mock(service_accounts, 'has_token_server', lambda: True)
    self.mock(service_accounts, 'get_task_account_token', mocked)
    self.mock_default_pool_acl(['blah@example.com'])
    self.mock(realms, 'check_tasks_create_in_realm', lambda *_: True)
    self.mock(realms, 'check_pools_create_task', lambda *_: True)
    self.mock(realms, 'check_tasks_act_as', lambda *_: True)

    self.set_as_bot()
    resp = self.bot_poll()

    # Create a task.
    self.set_as_user()
    _, task_summary_id = self.client_create_task_raw(
        service_account='blah@example.com',
        realm='test:task_realm')
    self.assertEqual('0', task_summary_id[-1])
    # Convert TaskResultSummary reference to TaskRunResult.
    task_id = task_summary_id[:-1] + '1'

    # Make the bot grab it.
    self.set_as_bot()
    resp = self.bot_poll()
    self.assertEqual(u'bot1', resp['manifest']['bot_id'])
    self.assertEqual(task_id, resp['manifest']['task_id'])

    # Pass wrong task_id.
    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(
            endpoint,
            account_id='task',
            task_id=task_id[:-1] + '2',
        ),
        expect_errors=True)
    self.assertEqual(400, response.status_code)
    self.assertEqual(
        {u'error': u'Wrong task_id: the bot is not executing this task'},
        response.json)
    self.assertFalse(calls)

    # Pass correct task_id this time.
    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(
            endpoint,
            account_id='task',
            task_id=task_id,
        ))
    self.assertEqual(
        {
            response_key: u'blah',
            u'expiry': 126240504,
            u'service_account': u'blah@example.com',
        }, response.json)
    self.assertEqual([task_id], calls)

    # Emulate fatal error.
    def mocked(*_args, **_kwargs):
      raise service_accounts.PermissionError('Fatal error')

    self.mock(service_accounts, 'get_task_account_token', mocked)

    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(
            endpoint,
            account_id='task',
            task_id=task_id,
        ),
        expect_errors=True)
    self.assertEqual(403, response.status_code)
    self.assertEqual({u'error': u'Fatal error'}, response.json)

    # Emulate transient error.
    def mocked(*_args, **_kwargs):
      raise service_accounts.InternalError('Transient error')

    self.mock(service_accounts, 'get_task_account_token', mocked)

    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(
            endpoint,
            account_id='task',
            task_id=task_id,
        ),
        expect_errors=True)
    self.assertEqual(500, response.status_code)
    self.assertEqual({u'error': u'Transient error'}, response.json)

  @parameterized.expand([
      (
          'oauth_token',
          'access_token',
          service_accounts.TOKEN_KIND_ACCESS_TOKEN,
          FAKE_SCOPES,
          None,
      ),
      (
          'id_token',
          'id_token',
          service_accounts.TOKEN_KIND_ID_TOKEN,
          None,
          FAKE_AUDIENCE,
      ),
  ])
  def test_token_system_account(self, endpoint, response_key,
                                expected_kind, expected_scopes,
                                expected_audience):
    self.set_as_bot()
    self.mock_bot_group_config(
        version='default',
        auth=(bot_groups_config.BotAuth(
            log_if_failed=False,
            require_luci_machine_token=False,
            require_service_account=None,
            require_gce_vm_token=None,
            ip_whitelist=None,
        ),),
        dimensions={},
        system_service_account='system@example.com',
        is_default=True)

    calls = []

    def mocked(account, kind, scopes=None, audience=None):
      calls.append(account)
      self.assertEqual(expected_kind, kind)
      self.assertEqual(expected_scopes, scopes)
      self.assertEqual(expected_audience, audience)
      return account, service_accounts.AccessToken('blah', 126240504)

    self.mock(service_accounts, 'get_system_account_token', mocked)

    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(endpoint, account_id='system')).json
    self.assertEqual(
        {
            response_key: u'blah',
            u'expiry': 126240504,
            u'service_account': u'system@example.com',
        }, response)
    self.assertEqual(['system@example.com'], calls)

  @parameterized.expand(['oauth_token', 'id_token'])
  def test_token_system_account_none(self, endpoint):
    self.set_as_bot()

    def mocked(account, *_args, **_kwargs):
      self.assertFalse(account)
      return 'none', None

    self.mock(service_accounts, 'get_system_account_token', mocked)

    response = self.app.post_json(
        '/swarming/api/v1/bot/%s' % endpoint,
        params=self.gen_token_request(endpoint, account_id='system')).json
    self.assertEqual({u'service_account': u'none'}, response)


if __name__ == '__main__':
  if '-v' in sys.argv:
    unittest.TestCase.maxDiff = None
  logging.basicConfig(
      level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL,
      format='%(levelname)-7s %(filename)s:%(lineno)3d %(message)s')
  unittest.main()
