# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import datetime
import hashlib
import json
import random
import StringIO
import unittest
import zipfile
import zlib

from google.appengine.ext import ndb
from testing_utils import testing

from components import auth
from components import utils

from cas import impl as cas_impl

from cipd import impl
from cipd import processing


class TestValidators(unittest.TestCase):
  def test_is_valid_package_name(self):
    self.assertTrue(impl.is_valid_package_name('a'))
    self.assertTrue(impl.is_valid_package_name('a/b'))
    self.assertTrue(impl.is_valid_package_name('a/b/c/1/2/3'))
    self.assertTrue(impl.is_valid_package_name('infra/tools/cipd'))
    self.assertTrue(impl.is_valid_package_name('-/_'))
    self.assertTrue(impl.is_valid_package_name('a/.b/..c/d./e..'))
    self.assertFalse(impl.is_valid_package_name(''))
    self.assertFalse(impl.is_valid_package_path('a//b'))
    self.assertFalse(impl.is_valid_package_name('/a'))
    self.assertFalse(impl.is_valid_package_name('a/'))
    self.assertFalse(impl.is_valid_package_name('A'))
    self.assertFalse(impl.is_valid_package_name('a/B'))
    self.assertFalse(impl.is_valid_package_name('a\\b'))
    self.assertFalse(impl.is_valid_package_name('./a'))
    self.assertFalse(impl.is_valid_package_name('a/../b'))
    self.assertFalse(impl.is_valid_package_path('a/.../b'))

  def test_is_valid_package_path(self):
    self.assertTrue(impl.is_valid_package_path('a'))
    self.assertTrue(impl.is_valid_package_path('a/b'))
    self.assertTrue(impl.is_valid_package_path('a/b/c/1/2/3'))
    self.assertTrue(impl.is_valid_package_path('infra/tools/cipd'))
    self.assertTrue(impl.is_valid_package_path('-/_'))
    self.assertTrue(impl.is_valid_package_path('a/.b/..c/d./e..'))
    self.assertFalse(impl.is_valid_package_path(''))
    self.assertFalse(impl.is_valid_package_path('a//b'))
    self.assertFalse(impl.is_valid_package_path('/a'))
    self.assertFalse(impl.is_valid_package_path('a/'))
    self.assertFalse(impl.is_valid_package_path('A'))
    self.assertFalse(impl.is_valid_package_path('a/B'))
    self.assertFalse(impl.is_valid_package_path('a\\b'))
    self.assertFalse(impl.is_valid_package_path('./a'))
    self.assertFalse(impl.is_valid_package_path('a/../b'))
    self.assertFalse(impl.is_valid_package_path('a/.../b'))

  def test_is_valid_instance_id(self):
    self.assertTrue(impl.is_valid_instance_id('a'*40))
    self.assertFalse(impl.is_valid_instance_id(''))
    self.assertFalse(impl.is_valid_instance_id('A'*40))

  def test_is_valid_package_ref(self):
    self.assertTrue(impl.is_valid_package_ref('ref'))
    self.assertTrue(impl.is_valid_package_ref('abc-_0123'))
    self.assertFalse(impl.is_valid_package_ref(''))
    self.assertFalse(impl.is_valid_package_ref('no-CAPS'))
    self.assertFalse(impl.is_valid_package_ref('a'*500))
    # Tags are not refs.
    self.assertFalse(impl.is_valid_package_ref('key:value'))
    self.assertFalse(impl.is_valid_package_ref('key:'))
    # Instance IDs are not refs.
    self.assertFalse(impl.is_valid_package_ref('a'*40))

  def test_is_valid_instance_tag(self):
    self.assertTrue(impl.is_valid_instance_tag('k:v'))
    self.assertTrue(impl.is_valid_instance_tag('key:'))
    self.assertTrue(impl.is_valid_instance_tag('key-_01234:#$%@\//%$SD'))
    self.assertFalse(impl.is_valid_instance_tag(''))
    self.assertFalse(impl.is_valid_instance_tag('key'))
    self.assertFalse(impl.is_valid_instance_tag('KEY:'))
    self.assertFalse(impl.is_valid_instance_tag('key:' + 'a'*500))

  def test_is_valid_counter_name(self):
    self.assertTrue(impl.is_valid_counter_name('cipd.installed'))
    self.assertTrue(impl.is_valid_counter_name('abc123-_.'))
    self.assertTrue(impl.is_valid_counter_name('a' * 300))
    self.assertFalse(impl.is_valid_counter_name('a' * 301))
    self.assertFalse(impl.is_valid_counter_name('ABC'))
    self.assertFalse(impl.is_valid_counter_name('k:v'))
    self.assertFalse(impl.is_valid_counter_name('a/b'))


class TestRepoService(testing.AppengineTestCase):
  maxDiff = None

  def setUp(self):
    super(TestRepoService, self).setUp()
    self.mocked_cas_service = MockedCASService()
    self.mock(impl.cas, 'get_cas_service', lambda: self.mocked_cas_service)
    self.service = impl.get_repo_service()

  def register_fake_instance(self, pkg_name, instance_id=None):
    _, registered = self.service.register_instance(
        package_name=pkg_name,
        instance_id=instance_id or 'a'*40,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertTrue(registered)

  def test_delete_package_ok(self):
    caller = auth.Identity.from_bytes('user:abc@example.com')
    # Setup all sorts of stuff associated with a package.
    self.register_fake_instance('a/b', 'a'*40)
    self.register_fake_instance('a/b', 'b'*40)
    self.service.set_package_ref('a/b', 'ref1', 'a'*40, caller)
    self.service.set_package_ref('a/b', 'ref2', 'b'*40, caller)
    self.service.attach_tags('a/b', 'a'*40, ['tag1:tag1', 'tag2:tag2'], caller)
    self.service.attach_tags('a/b', 'b'*40, ['tag1:tag1', 'tag2:tag2'], caller)

    # Another package, to make sure it stays alive.
    self.register_fake_instance('c/d')

    # Delete a/b and all associated stuff. Ensure entire entity group is nuked.
    # The implementation of delete_package doesn't use this sort of query (and
    # use explicit list of entity classes) as a reminder for future developers
    # to be mindful about what they are deleting and how.
    q = 'SELECT __key__ WHERE ANCESTOR IS :1'
    self.assertTrue(ndb.gql(q, impl.package_key('a/b')).fetch())
    self.service.delete_package('a/b')
    self.assertFalse(ndb.gql(q, impl.package_key('a/b')).fetch())
    self.assertFalse(self.service.get_instance('a/b', 'a'*40))

    # Another package is still fine.
    self.assertTrue(self.service.get_instance('c/d', 'a'*40))

  def test_delete_package_missing(self):
    self.assertIsNone(self.service.get_package('a/b'))
    self.assertFalse(self.service.delete_package('a/b'))

  def test_list_packages_no_path(self):
    self.assertIsNone(self.service.get_package('a/b'))
    self.assertIsNone(self.service.get_package('y/z'))
    self.register_fake_instance('y/z')
    self.register_fake_instance('a/b')
    self.assertEqual(([], ['a', 'y']),
                     self.service.list_packages('', False))
    self.assertEqual((['a/b', 'y/z'], ['a', 'y']),
                     self.service.list_packages('', True))

  def test_list_packages_with_path(self):
    self.assertIsNone(self.service.get_package('a/b'))
    self.assertIsNone(self.service.get_package('y/x'))
    self.assertIsNone(self.service.get_package('y/z/z'))
    self.register_fake_instance('y/x')
    self.register_fake_instance('y/z/z')
    self.register_fake_instance('a/b')
    self.assertEqual((['y/x'], ['y/z']), self.service.list_packages('y', False))
    self.assertEqual((['y/z/z'], []),
                     self.service.list_packages('y/z/z', False))
    self.assertEqual((['y/x'], ['y/z']),
                     self.service.list_packages('y/', False))
    self.assertEqual((['y/x', 'y/z/z'], ['y/z']),
                     self.service.list_packages('y', True))

  def test_list_packages_ignore_substrings(self):
    self.assertIsNone(self.service.get_package('good/path'))
    self.register_fake_instance('good/path')
    self.assertEqual((['good/path'], []),
                     self.service.list_packages('good', False))
    self.assertEqual((['good/path'], []),
                     self.service.list_packages('good/', False))
    self.assertEqual(([], []),
                     self.service.list_packages('goo', False))

  def test_list_packages_where_a_package_is_also_a_directory(self):
    self.assertIsNone(self.service.get_package('good'))
    self.assertIsNone(self.service.get_package('good/path'))
    self.register_fake_instance('good')
    self.register_fake_instance('good/path')
    self.assertEqual((['good'], ['good']),
                     self.service.list_packages('', False))
    self.assertEqual((['good', 'good/path'], ['good']),
                     self.service.list_packages('', True))
    # To keep things simple we match packages with names matching the search
    # with the trailing slash stripped.
    self.assertEqual((['good', 'good/path'], []),
                     self.service.list_packages('good/', False))

  def test_list_packages_with_an_empty_directory(self):
    self.assertIsNone(self.service.get_package('good/sub/path'))
    self.register_fake_instance('good/sub/path')
    self.assertEqual(([], ['good/sub']),
                     self.service.list_packages('good', False))
    self.assertEqual((['good/sub/path'], ['good/sub']),
                     self.service.list_packages('good', True))
    self.assertEqual((['good/sub/path'], ['good', 'good/sub']),
                     self.service.list_packages('', True))

  def test_list_packages_with_hidden_packages(self):
    self.assertIsNone(self.service.get_package('a/b'))
    self.assertIsNone(self.service.get_package('a/c'))
    self.register_fake_instance('a/b')
    self.register_fake_instance('a/c')

    # Both are visible initially.
    self.assertEqual(
        (['a/b', 'a/c'], []),
        self.service.list_packages('a/', False, False))

    def mutation(pkg):
      pkg.hidden = True
      return True
    self.service.modify_package('a/c', mutation)

    # 'a/c' is no longer visible.
    self.assertEqual(
        (['a/b'], []),
        self.service.list_packages('a/', False, False))

    # Unless asked to show hidden packages.
    self.assertEqual(
        (['a/b', 'a/c'], []),
        self.service.list_packages('a/', False, True))

  def test_modify_package_ok(self):
    self.register_fake_instance('a/b')
    pkg = self.service.get_package('a/b')
    self.assertFalse(pkg.hidden)

    def mutation(pkg):
      pkg.hidden = True
      return True
    pkg = self.service.modify_package('a/b', mutation)
    self.assertTrue(pkg.hidden)

    pkg = self.service.get_package('a/b')
    self.assertTrue(pkg.hidden)

  def test_modify_package_unchanged(self):
    self.register_fake_instance('a/b')
    pkg = self.service.get_package('a/b')
    self.assertFalse(pkg.hidden)

    def mutation(pkg):
      pkg.hidden = True
      return False
    pkg = self.service.modify_package('a/b', mutation)
    self.assertTrue(pkg.hidden) # returns whatever 'mutation' did

    pkg = self.service.get_package('a/b')
    self.assertFalse(pkg.hidden) # the change wasn't persisted though

  def test_modify_package_missing(self):
    def mutation(_):
      return False # pragma: no cover
    pkg = self.service.modify_package('a/b', mutation)
    self.assertIsNone(pkg)

  def test_register_instance_new(self):
    self.assertIsNone(self.service.get_instance('a/b', 'a'*40))
    self.assertIsNone(self.service.get_package('a/b'))
    inst, registered = self.service.register_instance(
        package_name='a/b',
        instance_id='a'*40,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertTrue(registered)
    self.assertEqual(
        ndb.Key('Package', 'a/b', 'PackageInstance', 'a'*40), inst.key)
    self.assertEqual('a/b', inst.package_name)
    self.assertEqual('a'*40, inst.instance_id)
    expected = {
      'registered_by': auth.Identity(kind='user', name='abc@example.com'),
      'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
      'processors_failure': [],
      'processors_pending': [],
      'processors_success': [],
    }
    self.assertEqual(expected, inst.to_dict())
    self.assertEqual(
        expected, self.service.get_instance('a/b', 'a'*40).to_dict())
    pkg = self.service.get_package('a/b')
    self.assertTrue(pkg)
    self.assertEqual('a/b', pkg.package_name)

  def test_register_instance_existing(self):
    # First register a package.
    inst1, registered = self.service.register_instance(
        package_name='a/b',
        instance_id='a'*40,
        caller=auth.Identity.from_bytes('user:abc@example.com'))
    self.assertTrue(registered)
    # Try to register it again.
    inst2, registered = self.service.register_instance(
          package_name='a/b',
          instance_id='a'*40,
          caller=auth.Identity.from_bytes('user:def@example.com'))
    self.assertFalse(registered)
    self.assertEqual(inst1.to_dict(), inst2.to_dict())

  def test_generate_fetch_url(self):
    inst, registered = self.service.register_instance(
        package_name='a/b',
        instance_id='a'*40,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertTrue(registered)
    url = self.service.generate_fetch_url(inst)
    self.assertEqual(
        'https://signed-url/SHA1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', url)

  def test_is_instance_file_uploaded(self):
    self.mocked_cas_service.uploaded[('SHA1', 'a'*40)] = ''
    self.assertTrue(self.service.is_instance_file_uploaded('a/b', 'a'*40))
    self.assertFalse(self.service.is_instance_file_uploaded('a/b', 'b'*40))

  def test_create_upload_session(self):
    upload_url, upload_session_id = self.service.create_upload_session(
        'a/b', 'a'*40, auth.Identity.from_bytes('user:abc@example.com'))
    self.assertEqual('http://upload_url', upload_url)
    self.assertEqual('upload_session_id', upload_session_id)

  def test_register_instance_with_processing(self):
    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 1))

    self.service.processors.append(MockedProcessor('bad', 'Error message'))
    self.service.processors.append(MockedProcessor('good'))

    tasks = []
    def mocked_enqueue_task(**kwargs):
      tasks.append(kwargs)
      return True
    self.mock(impl.utils, 'enqueue_task', mocked_enqueue_task)

    # The processors are added to the pending list.
    inst, registered = self.service.register_instance(
        package_name='a/b',
        instance_id='a'*40,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertTrue(registered)
    expected = {
      'registered_by': auth.Identity(kind='user', name='abc@example.com'),
      'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
      'processors_failure': [],
      'processors_pending': ['bad', 'good'],
      'processors_success': [],
    }
    self.assertEqual(expected, inst.to_dict())

    # The processing task is enqueued.
    self.assertEqual([{
      'payload': '{"instance_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", '
          '"package_name": "a/b", "processors": ["bad", "good"]}',
      'queue_name': 'cipd-process',
      'transactional': True,
      'url': '/internal/taskqueue/cipd-process/'
          'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
    }], tasks)

    # Now execute the task.
    self.service.process_instance(
        package_name='a/b',
        instance_id='a'*40,
        processors=['bad', 'good'])

    # Assert the final state.
    inst = self.service.get_instance('a/b', 'a'*40)
    expected = {
      'registered_by': auth.Identity(kind='user', name='abc@example.com'),
      'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
      'processors_failure': ['bad'],
      'processors_pending': [],
      'processors_success': ['good'],
    }
    self.assertEqual(expected, inst.to_dict())

    good_result = self.service.get_processing_result('a/b', 'a'*40, 'good')
    self.assertEqual({
      'created_ts': datetime.datetime(2014, 1, 1),
      'error': None,
      'result': {
        'instance_id': 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
        'package_name': 'a/b',
        'processor_name': 'good',
      },
      'success': True,
    }, good_result.to_dict())

    bad_result = self.service.get_processing_result('a/b', 'a'*40, 'bad')
    self.assertEqual({
      'created_ts': datetime.datetime(2014, 1, 1),
      'error': 'Error message',
      'result': None,
      'success': False,
    }, bad_result.to_dict())

  def test_client_binary_extraction(self):
    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 1))

    # Prepare fake cipd binary package.
    out = StringIO.StringIO()
    zf = zipfile.ZipFile(out, 'w', zipfile.ZIP_DEFLATED)
    zf.writestr('cipd', 'cipd binary data here')
    zf.close()
    zipped = out.getvalue()
    digest = hashlib.sha1(zipped).hexdigest()

    # Pretend it is uploaded.
    self.mocked_cas_service.uploaded[('SHA1', digest)] = zipped

    # Register it as a package instance.
    self.mock(impl.utils, 'enqueue_task', lambda **_args: True)
    inst, registered = self.service.register_instance(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id=digest,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertTrue(registered)
    expected = {
      'registered_by': auth.Identity(kind='user', name='abc@example.com'),
      'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
      'processors_failure': [],
      'processors_pending': ['cipd_client_binary:v1'],
      'processors_success': [],
    }
    self.assertEqual(expected, inst.to_dict())

    # get_client_binary_info indicated that processing is not done yet.
    instance = self.service.get_instance(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id=digest)
    info, error_msg = self.service.get_client_binary_info(instance)
    self.assertIsNone(info)
    self.assertIsNone(error_msg)

    # Execute post-processing task: it would extract CIPD binary.
    self.service.process_instance(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id=digest,
        processors=['cipd_client_binary:v1'])

    # Ensure succeeded.
    result = self.service.get_processing_result(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id=digest,
        processor_name='cipd_client_binary:v1')
    self.assertEqual({
      'created_ts': datetime.datetime(2014, 1, 1, 0, 0),
      'success': True,
      'error': None,
      'result': {
        'client_binary': {
          'hash_algo': 'SHA1',
          'hash_digest': '5a72c1535f8d132c341585207504d94e68ef8a9d',
          'size': 21,
        },
      },
    }, result.to_dict())

    # Verify get_client_binary_info works too.
    instance = self.service.get_instance(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id=digest)
    info, error_msg = self.service.get_client_binary_info(
        instance, filename='boo')
    expected = impl.ClientBinaryInfo(
        sha1='5a72c1535f8d132c341585207504d94e68ef8a9d',
        size=21,
        fetch_url=(
            'https://signed-url/SHA1/5a72c1535f8d132c341585207504d94e68ef8a9d'
            '?filename=boo'))
    self.assertIsNone(error_msg)
    self.assertEqual(expected, info)

  def test_client_binary_extract_failure(self):
    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 1))

    # Pretend some fake data is uploaded.
    self.mocked_cas_service.uploaded[('SHA1', 'a'*40)] = 'not a zip'

    # Register it as a package instance.
    self.mock(impl.utils, 'enqueue_task', lambda **_args: True)
    inst, registered = self.service.register_instance(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id='a'*40,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertTrue(registered)
    expected = {
      'registered_by': auth.Identity(kind='user', name='abc@example.com'),
      'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
      'processors_failure': [],
      'processors_pending': ['cipd_client_binary:v1'],
      'processors_success': [],
    }
    self.assertEqual(expected, inst.to_dict())

    # Execute post-processing task: it would fail extracting CIPD binary.
    self.service.process_instance(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id='a'*40,
        processors=['cipd_client_binary:v1'])

    # Ensure error is reported.
    result = self.service.get_processing_result(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id='a'*40,
        processor_name='cipd_client_binary:v1')
    self.assertEqual({
      'created_ts': datetime.datetime(2014, 1, 1, 0, 0),
      'success': False,
      'error': 'File is not a zip file',
      'result': None,
    }, result.to_dict())

    # Verify get_client_binary_info reports it too.
    instance = self.service.get_instance(
        package_name='infra/tools/cipd/linux-amd64',
        instance_id='a'*40)
    info, error_msg = self.service.get_client_binary_info(instance)
    self.assertIsNone(info)
    self.assertEqual(
        'Failed to extract the binary: File is not a zip file', error_msg)

  def test_compressed_processing_result(self):
    p = impl.ProcessingResult()
    p.result = zlib.compress(json.dumps({'a': 'b'}))
    self.assertEqual({'a': 'b'}, p.read_result())

  def test_set_package_ref(self):
    ident1 = auth.Identity.from_bytes('user:abc@example.com')
    now1 = datetime.datetime(2015, 1, 1, 0, 0)

    ident2 = auth.Identity.from_bytes('user:def@example.com')
    now2 = datetime.datetime(2016, 1, 1, 0, 0)

    self.service.register_instance(
        package_name='a/b',
        instance_id='a'*40,
        caller=ident1,
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.service.register_instance(
        package_name='a/b',
        instance_id='b'*40,
        caller=ident1,
        now=datetime.datetime(2014, 1, 1, 0, 0))

    ref = self.service.set_package_ref('a/b', 'ref', 'a'*40, ident1, now1)
    self.assertEqual({
      'instance_id': 'a'*40,
      'modified_by': ident1,
      'modified_ts': now1,
    }, ref.to_dict())
    self.assertEqual('ref', ref.ref)

    # Move to the same value -> modified_ts do not change.
    ref = self.service.set_package_ref('a/b', 'ref', 'a'*40, ident2, now2)
    self.assertEqual({
      'instance_id': 'a'*40,
      'modified_by': ident1,
      'modified_ts': now1,
    }, ref.to_dict())

    # Move to another value.
    ref = self.service.set_package_ref('a/b', 'ref', 'b'*40, ident2, now2)
    self.assertEqual({
      'instance_id': 'b'*40,
      'modified_by': ident2,
      'modified_ts': now2,
    }, ref.to_dict())

    # Code coverage for package_name.
    self.assertEqual('a/b', ref.package_name)

  def test_get_package_refs(self):
    ident = auth.Identity.from_bytes('user:abc@example.com')
    now1 = datetime.datetime(2015, 1, 1, 0, 0)
    now2 = datetime.datetime(2015, 1, 2, 0, 0)

    self.register_fake_instance('a/b', 'a'*40)
    self.service.set_package_ref('a/b', 'ref1', 'a'*40, ident, now1)

    self.register_fake_instance('a/b', 'b'*40)
    self.service.set_package_ref('a/b', 'ref2', 'b'*40, ident, now2)

    refs = self.service.get_package_refs('a/b', ['ref1', 'ref2', 'missing'])
    self.assertEqual({
      'missing': None,
      'ref1': {
        'instance_id': 'a'*40,
        'modified_by': ident,
        'modified_ts': now1,
      },
      'ref2': {
        'instance_id': 'b'*40,
        'modified_by': ident,
        'modified_ts': now2,
      },
    }, {k: v.to_dict() if v else None for k, v in refs.iteritems()})

  def test_query_package_refs(self):
    ident = auth.Identity.from_bytes('user:abc@example.com')
    now1 = datetime.datetime(2015, 1, 1, 0, 0)
    now2 = datetime.datetime(2015, 1, 2, 0, 0)

    self.register_fake_instance('a/b', 'a'*40)
    self.service.set_package_ref('a/b', 'ref1', 'a'*40, ident, now1)

    self.register_fake_instance('a/b', 'b'*40)
    self.service.set_package_ref('a/b', 'ref2', 'b'*40, ident, now2)

    refs = self.service.query_package_refs('a/b')
    self.assertEqual([
      {
        'instance_id': 'b'*40,
        'modified_by': ident,
        'modified_ts': now2,
      },
      {
        'instance_id': 'a'*40,
        'modified_by': ident,
        'modified_ts': now1,
      },
    ], [ref.to_dict() for ref in refs])

  def test_query_instance_refs(self):
    ident = auth.Identity.from_bytes('user:abc@example.com')
    now1 = datetime.datetime(2015, 1, 1, 0, 0)
    now2 = datetime.datetime(2015, 1, 2, 0, 0)

    self.register_fake_instance('a/b', 'a'*40)
    self.service.set_package_ref('a/b', 'ref1', 'a'*40, ident, now1)
    self.service.set_package_ref('a/b', 'ref2', 'a'*40, ident, now2)

    # Should not appear in results.
    self.register_fake_instance('a/b', 'b'*40)
    self.service.set_package_ref('a/b', 'ref3', 'b'*40, ident, now1)

    refs = self.service.query_instance_refs('a/b', 'a'*40)
    self.assertEqual(['ref2', 'ref1'], [ref.ref for ref in refs])

  def test_attach_detach_tags(self):
    _, registered = self.service.register_instance(
        package_name='a/b',
        instance_id='a'*40,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertTrue(registered)

    # Add a tag.
    attached = self.service.attach_tags(
        package_name='a/b',
        instance_id='a'*40,
        tags=['tag1:value1'],
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.assertEqual(
      {
        'tag1:value1': {
          'registered_by': auth.Identity(kind='user', name='abc@example.com'),
          'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
          'tag': 'tag1:value1',
        },
      }, {k: e.to_dict() for k, e in attached.iteritems()})
    self.assertEqual('a/b', attached['tag1:value1'].package_name)
    self.assertEqual('a'*40, attached['tag1:value1'].instance_id)

    # Attempt to attach existing one (and one new).
    attached = self.service.attach_tags(
        package_name='a/b',
        instance_id='a'*40,
        tags=['tag1:value1', 'tag2:value2'],
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2015, 1, 1, 0, 0))
    self.assertEqual(
      {
        'tag1:value1': {
          'registered_by': auth.Identity(kind='user', name='abc@example.com'),
          # Didn't change to 2015.
          'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
          'tag': 'tag1:value1',
        },
        'tag2:value2': {
          'registered_by': auth.Identity(kind='user', name='abc@example.com'),
          'registered_ts': datetime.datetime(2015, 1, 1, 0, 0),
          'tag': 'tag2:value2',
        },
      }, {k: e.to_dict() for k, e in attached.iteritems()})

    # Get specific tags.
    tags = self.service.get_tags('a/b', 'a'*40, ['tag1:value1', 'missing:'])
    self.assertEqual(
      {
        'tag1:value1': {
          'registered_by': auth.Identity(kind='user', name='abc@example.com'),
          'registered_ts': datetime.datetime(2014, 1, 1, 0, 0),
          'tag': 'tag1:value1',
        },
        'missing:': None,
      }, {k: e.to_dict() if e else None for k, e in tags.iteritems()})

    # Get all tags. Newest first.
    tags = self.service.query_tags('a/b', 'a'*40)
    self.assertEqual(['tag2:value2', 'tag1:value1'], [t.tag for t in tags])

    # Search by specific tag (in a package).
    found = self.service.search_by_tag('tag1:value1', package_name='a/b')
    self.assertEqual(
        [('a/b', 'a'*40)], [(e.package_name, e.instance_id) for e in found])

    # Search by specific tag (globally). Use callback to cover this code path.
    found = self.service.search_by_tag('tag1:value1')
    self.assertEqual(
        [('a/b', 'a'*40)], [(e.package_name, e.instance_id) for e in found])

    # Cover callback usage.
    found = self.service.search_by_tag(
        'tag1:value1', callback=lambda *_a: False)
    self.assertFalse(found)

    # Remove tag, search again -> missing.
    self.service.detach_tags('a/b', 'a'*40, ['tag1:value1', 'missing:'])
    found = self.service.search_by_tag('tag1:value1')
    self.assertFalse(found)

  def add_tagged_instance(self, package_name, instance_id, tags):
    self.service.register_instance(
        package_name=package_name,
        instance_id=instance_id,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))
    self.service.attach_tags(
        package_name=package_name,
        instance_id=instance_id,
        tags=tags,
        caller=auth.Identity.from_bytes('user:abc@example.com'),
        now=datetime.datetime(2014, 1, 1, 0, 0))

  def test_resolve_version(self):
    self.add_tagged_instance('a/b', 'a'*40, ['tag1:value1', 'tag2:value2'])
    self.add_tagged_instance('a/b', 'b'*40, ['tag1:value1'])
    self.add_tagged_instance('a/b', 'c'*40, ['tag1:value1'])
    self.service.set_package_ref(
        'a/b', 'ref', 'a'*40, auth.Identity.from_bytes('user:abc@example.com'))

    self.assertEqual([], self.service.resolve_version('a/b', 'd'*40, 2))
    self.assertEqual([], self.service.resolve_version('a/b', 'tag3:', 2))
    self.assertEqual([], self.service.resolve_version('a/b/c/d', 'a'*40, 2))
    self.assertEqual([], self.service.resolve_version('a/b', 'not-such-ref', 2))
    self.assertEqual(['a'*40], self.service.resolve_version('a/b', 'ref', 2))
    self.assertEqual(['a'*40], self.service.resolve_version('a/b', 'a'*40, 2))
    self.assertEqual(
        ['a'*40], self.service.resolve_version('a/b', 'tag2:value2', 2))

    # No order guarantees when multiple results match.
    res = self.service.resolve_version('a/b', 'tag1:value1', 2)
    self.assertEqual(2, len(res))
    self.assertTrue(set(['a'*40, 'b'*40, 'c'*40]).issuperset(res))

  def test_list_instances_success(self):
    now = datetime.datetime(2018, 1, 1, 0, 0)
    pkg = 'package/name'

    def mk(iid, ts, by='user:a@example.com', procs=0):
      inst = impl.PackageInstance(
          key=impl.package_instance_key(pkg, iid),
          registered_by=auth.Identity.from_bytes(by),
          registered_ts=now+datetime.timedelta(seconds=ts),
          processors_pending=['proc']*procs)
      inst.put()
      return inst

    # No instanced yet at all.
    res, cursor = self.service.list_instances(pkg)
    self.assertEqual([], res)
    self.assertIsNone(cursor)

    # Add a bunch of instances (all are ready).
    old = mk('a'*40, -5)
    fresh = mk('b'*40, 0)
    oldest = mk('c'*40, -10)

    def do_tests():
      # Returned in correct order.
      res, cursor = self.service.list_instances(pkg)
      self.assertEqual([fresh, old, oldest], res)
      self.assertIsNone(cursor)

      # Pagination works too.
      res, cursor = self.service.list_instances(pkg, limit=2)
      self.assertEqual([fresh, old], res)
      self.assertIsNotNone(cursor)
      res, cursor = self.service.list_instances(pkg, limit=2, cursor=cursor)
      self.assertEqual([oldest], res)
      self.assertIsNone(cursor)

    do_tests()

    # Add one more not-yet-ready instance.
    mk('d'*40, 5, procs=1)

    # The listing totally ignores it.
    do_tests()

  def test_list_instances_bad_cursor(self):
    with self.assertRaises(ValueError):
      self.service.list_instances('a/b', cursor='watsup')

  def test_read_missing_counter(self):
    counter = self.service.read_counter('a/b', 'a'*40, 'test.counter')
    self.assertEqual(0, counter.value)
    self.assertIsNone(counter.created_ts)
    self.assertIsNone(counter.updated_ts)

  def test_touch_counter(self):
    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 1))

    self.service.increment_counter('a/b', 'a'*40, 'test.counter', 0)
    counter = self.service.read_counter('a/b', 'a'*40, 'test.counter')
    self.assertEqual(0, counter.value)
    self.assertEqual(datetime.datetime(2014, 1, 1), counter.created_ts)
    self.assertEqual(datetime.datetime(2014, 1, 1), counter.updated_ts)

  def test_increment_counter_timestamps(self):
    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 1))
    self.service.increment_counter('a/b', 'a'*40, 'test.counter', 1)

    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 2))
    self.service.increment_counter('a/b', 'a'*40, 'test.counter', 1)

    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 3))
    self.service.increment_counter('a/b', 'a'*40, 'test.counter', 1)

    counter = self.service.read_counter('a/b', 'a'*40, 'test.counter')
    self.assertEqual(3, counter.value)
    self.assertEqual(datetime.datetime(2014, 1, 1), counter.created_ts)
    self.assertEqual(datetime.datetime(2014, 1, 3), counter.updated_ts)

  def test_increment_counter_same_shard(self):
    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 1))
    self.mock(random, 'randint', lambda a, b: 42)
    self.service.increment_counter('a/b', 'a'*40, 'test.counter', 1)

    self.mock(utils, 'utcnow', lambda: datetime.datetime(2014, 1, 2))
    self.service.increment_counter('a/b', 'a'*40, 'test.counter', 1)

    counter = self.service.read_counter('a/b', 'a'*40, 'test.counter')
    self.assertEqual(2, counter.value)
    self.assertEqual(datetime.datetime(2014, 1, 1), counter.created_ts)
    self.assertEqual(datetime.datetime(2014, 1, 2), counter.updated_ts)


class MockedCASService(object):
  def __init__(self):
    self.uploaded = {}

  def generate_fetch_url(self, algo, digest, filename=None):
    r = 'https://signed-url/%s/%s' % (algo, digest)
    if filename:
      r += '?filename=%s' % filename
    return r

  def is_object_present(self, algo, digest):
    return (algo, digest) in self.uploaded

  def create_upload_session(self, _algo, _digest, _caller):
    class UploadSession(object):
      upload_url = 'http://upload_url'
    return UploadSession(), 'upload_session_id'

  def open(self, hash_algo, hash_digest, read_buffer_size):
    assert read_buffer_size > 0
    if not self.is_object_present(hash_algo, hash_digest):  # pragma: no cover
      raise cas_impl.NotFoundError()
    return StringIO.StringIO(self.uploaded[(hash_algo, hash_digest)])

  def start_direct_upload(self, hash_algo):
    assert hash_algo == 'SHA1'
    return cas_impl.DirectUpload(
        file_obj=StringIO.StringIO(),
        hasher=hashlib.sha1(),
        callback=lambda *_args: None)


class MockedProcessor(processing.Processor):
  def __init__(self, name, error=None):
    self.name = name
    self.error = error

  def should_process(self, instance):
    return True

  def run(self, instance, data):
    if self.error:
      raise processing.ProcessingError(self.error)
    return {
      'instance_id': instance.instance_id,
      'package_name': instance.package_name,
      'processor_name': self.name,
    }
