blob: 1b688eba0f89e675a138ed542eaf92848e8fb1e9 [file] [log] [blame]
#!/usr/bin/env python2.5
#
# Copyright 2008 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""These tests run against the prod safebrowsing servers."""
import sblist
import server
import base64
import cgi
import hmac
import logging
import sha
import StringIO
import sys
import unittest
import urlparse
class HTTPError(Exception):
"""Fake HTTP error used to test gethash requests that return a 204."""
def __init__(self, code):
self.code = code
def __str__(self):
return 'HTTPError code:%d' % self.code
class FakeServer(object):
"""Helper class which simulates the SafeBrowsing server."""
def __init__(self):
# array of (url prefix, exp. params, request, response data or exception)
self._responses = []
def SetResponse(self, url_prefix, params, request, response):
"""Set a fake response for a particular request.
If a request comes in to the fake server with a url that matches the given
url_prefix and a request body that matches the given request body and the
given params are a subset of the request CGI arguments the fake server will
serve the given baked response or raise an exception if the response is an
exception object.
Args:
url_prefix: url prefix that has to match for the response to be sent.
params: sub-set of CGI parameters that have to be present for the request
to be valid and the response to be sent. Can be None.
Format: [(arg1, value1), (arg2, value2), ...].
request: request body that has to be set for the response to be sent.
Can be None if no request body is expected for a particular
request.
response: Response data to send or exception to raise if the conditions
above are met.
"""
self._responses.append((url_prefix, params, request, response))
def _HasExpectedParams(self, url, expected_params):
"""Returns true if the expected CGI parameters are set in the given URL."""
if expected_params:
actual_params = cgi.parse_qs(urlparse.urlparse(url)[4])
for key, value in expected_params:
if key not in actual_params or actual_params[key][0] != value:
return False
return True
def HandleRequest(self, url, data):
for url_prefix, params, request, response in self._responses:
if (url.startswith(url_prefix) and
request == data and
self._HasExpectedParams(url, params)):
if isinstance(response, Exception):
raise response
else:
return StringIO.StringIO(response)
raise Exception('No such data: %s' % url)
class ServerTest(unittest.TestCase):
def setUp(self):
self._fake_sb_server = FakeServer()
self._server = server.Server(
('safebrowsing.clients.google.com', 80),
('sb-ssl.google.com', 443),
'/safebrowsing',
clientkey="BOGUS_CLIENT_KEY",
wrkey="BOGUS_WRAPPED_KEY",
apikey="BOGUS_API_KEY",
url_request_function=self._fake_sb_server.HandleRequest)
self._base_url = 'http://safebrowsing.clients.google.com:80/safebrowsing'
def _Mac(self, data):
clientkey, wrkey = self._server.Keys()
return base64.urlsafe_b64encode(hmac.new(clientkey, data, sha).digest())
def testGetLists(self):
response = 'lista\nlistb\nlistc'
response = '%s\n%s' % (self._Mac(response), response)
self._fake_sb_server.SetResponse(url_prefix='%s/list?' % self._base_url,
params=[('wrkey', 'BOGUS_WRAPPED_KEY'),
('apikey', 'BOGUS_API_KEY')],
request=None,
response=response)
self.assertEqual(['lista', 'listb', 'listc'],
[l.Name() for l in self._server.GetLists()])
def testKeys(self):
self.assertEqual(('BOGUS_CLIENT_KEY', 'BOGUS_WRAPPED_KEY'),
self._server.Keys())
def testRekey(self):
self._fake_sb_server.SetResponse(
url_prefix='https://sb-ssl.google.com:443/safebrowsing/newkey?',
params=None,
request=None,
response=('clientkey:28:TkVXX0JPR1VTX0NMSUVOVF9LRVk=\n' +
'wrappedkey:15:NEW_BOGUS_WRKEY'))
self.assertEqual(('NEW_BOGUS_CLIENT_KEY', 'NEW_BOGUS_WRKEY'),
self._server.Rekey())
self.assertEqual(('NEW_BOGUS_CLIENT_KEY', 'NEW_BOGUS_WRKEY'),
self._server.Keys())
def testDownload(self):
# First we setup the redirect requests.
lista_a_redirect_response = ('a:10:4:27\n' +
'1234\x00' +
'5678\x01ABCD' +
'EFGH\x02EFGHIJKL' +
# Empty add chunk.
'a:7:4:0\n')
lista_s_redirect_response = ('s:2:4:22\n' +
'5678\x01\x00\x00\x00\x0AABCD' +
# Special case where there is no prefix.
'EFGH\x00\x00\x00\x00\x0A' +
# Empty sub chunk
's:3:4:0\n')
listb_a_redirect_response = (
'a:1:6:1546\n' +
# Test an edge case where there are more than 255 entries for
# the same host key
'1234\xFF%s' % ''.join(map(str, range(100000, 100255))) +
'1234\x01100255')
self._fake_sb_server.SetResponse(
url_prefix='http://rd.com/lista-a',
params=None,
request=None,
response=lista_a_redirect_response)
self._fake_sb_server.SetResponse(
url_prefix='http://rd.com/lista-s',
params=None,
request=None,
response=lista_s_redirect_response)
self._fake_sb_server.SetResponse(
url_prefix='http://rd.com/listb-a',
params=None,
request=None,
# Make sure we can handle prefixes that are >4B.
response=listb_a_redirect_response)
response = '\n'.join(['n:1800',
'i:lista',
'u:rd.com/lista-s,%s' %
self._Mac(lista_s_redirect_response),
'u:rd.com/lista-a,%s' %
self._Mac(lista_a_redirect_response),
'ad:1-2,4-5,7',
'i:listb',
'u:rd.com/listb-a,%s' %
self._Mac(listb_a_redirect_response),
'sd:2-6'])
self._fake_sb_server.SetResponse(
url_prefix='%s/downloads?' % self._base_url,
params=[('wrkey', 'BOGUS_WRAPPED_KEY'),
('apikey', 'BOGUS_API_KEY')],
request='s;%d\nlista;mac\nlistb;mac\n' % (1<<10),
response='m:%s\n%s' % (self._Mac(response), response))
# Perform the actual download request.
sblists = [sblist.List('lista'), sblist.List('listb')]
response = self._server.Download(sblists, 1<<20)
#### Test that the download response contains the correct list ops ####
self.assertEqual(1800, response.min_delay_sec)
self.assertFalse(response.rekey)
self.assertFalse(response.reset)
self.assertEqual(['lista', 'listb'], response.listops.keys())
self.assertTrue(isinstance(response.listops['lista'][0], server.SubChunk))
self.assertEqual(2, response.listops['lista'][0]._chunknum)
self.assertEqual(4, response.listops['lista'][0]._prefixlen)
self.assertEqual([('ABCD', 10), ('EFGH', 10)],
response.listops['lista'][0]._prefixes)
self.assertTrue(isinstance(response.listops['lista'][1],
server.EmptySubChunks))
self.assertEqual([3], response.listops['lista'][1]._chunknums)
self.assertTrue(isinstance(response.listops['lista'][2], server.AddChunk))
self.assertEqual(10, response.listops['lista'][2]._chunknum)
self.assertEqual(4, response.listops['lista'][2]._prefixlen)
self.assertEqual(['1234', 'ABCD', 'EFGH', 'IJKL'],
response.listops['lista'][2]._prefixes)
self.assertTrue(isinstance(response.listops['lista'][3],
server.EmptyAddChunks))
self.assertEqual([7], response.listops['lista'][3]._chunknums)
self.assertTrue(isinstance(response.listops['lista'][4], server.AddDel))
self.assertEqual([1, 2, 4, 5, 7],
list(response.listops['lista'][4]._chunknums))
self.assertTrue(isinstance(response.listops['listb'][0], server.AddChunk))
self.assertEqual(1, response.listops['listb'][0]._chunknum)
self.assertEqual(6, response.listops['listb'][0]._prefixlen)
self.assertEqual(map(str, range(100000, 100256)),
response.listops['listb'][0]._prefixes)
self.assertTrue(isinstance(response.listops['listb'][1], server.SubDel))
self.assertEqual([2, 3, 4, 5, 6],
list(response.listops['listb'][1]._chunknums))
def testGetFullHashes(self):
response = 'lista:123:32\n89AB%s' % (28 * 'A')
self._fake_sb_server.SetResponse(
url_prefix='%s/gethash?' % self._base_url,
params=[('wrkey', 'BOGUS_WRAPPED_KEY'),
('apikey', 'BOGUS_API_KEY')],
request='4:12\n0123456789AB',
response='%s\n%s' % (self._Mac(response), response))
self._fake_sb_server.SetResponse(
url_prefix='%s/gethash?' % self._base_url,
params=[('wrkey', 'BOGUS_WRAPPED_KEY'),
('apikey', 'BOGUS_API_KEY')],
request='10:10\n0123456789',
response=HTTPError(204))
resp = self._server.GetFullHashes(['0123', '4567', '89AB'], 4)
self.assertTrue(isinstance(resp, server.GetHashResponse))
self.assertFalse(resp.rekey)
self.assertEqual({'lista': { 123: set(['89AB%s' % (28 * 'A')])}},
resp.listmap)
resp = self._server.GetFullHashes(['0123456789'], 10)
self.assertTrue(isinstance(resp, server.GetHashResponse))
self.assertFalse(resp.rekey)
self.assertEqual({}, resp.listmap)
def testGetSequence(self):
chunkseq = server.Server._GetSequence('1-2,4-5,7-10,11')
expected = [1, 2, 4, 5, 7, 8, 9, 10, 11]
# Should be able to iterate over chunkseq multiple times.
for i in xrange(0, 5):
self.assertEqual(expected, list(chunkseq))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()