blob: ecec6c6fd72f6314d71e00125650634d825b8199 [file] [log] [blame]
from logging import getLogger
from time import sleep
from unittest.mock import patch
import pytest
from gridfs import GridFS
from gridfs.errors import CorruptGridFile, FileExists
from pymongo import MongoClient
from requests_cache._utils import get_valid_kwargs
from requests_cache.backends import (
GridFSCache,
GridFSPickleDict,
MongoCache,
MongoDict,
MongoPickleDict,
)
from requests_cache.expiration import NEVER_EXPIRE
from requests_cache.serializers import bson_document_serializer
from tests.conftest import HTTPBIN_FORMATS, HTTPBIN_METHODS, fail_if_no_connection, httpbin
from tests.integration.base_cache_test import TEST_SERIALIZERS, BaseCacheTest
from tests.integration.base_storage_test import BaseStorageTest
# Add extra MongoDB-specific format to list of serializers to test against
MONGODB_SERIALIZERS = [bson_document_serializer] + list(TEST_SERIALIZERS.values())
logger = getLogger(__name__)
@pytest.fixture(scope='module', autouse=True)
@fail_if_no_connection(connect_timeout=2)
def ensure_connection():
"""Fail all tests in this module if MongoDB is not running"""
from pymongo import MongoClient
client = MongoClient(serverSelectionTimeoutMS=2000)
client.server_info()
class TestMongoDict(BaseStorageTest):
storage_class = MongoDict
class TestMongoPickleDict(BaseStorageTest):
storage_class = MongoPickleDict
picklable = True
@patch('requests_cache.backends.mongodb.MongoClient')
@patch(
'requests_cache.backends.mongodb.get_valid_kwargs',
side_effect=lambda cls, kwargs: get_valid_kwargs(MongoClient, kwargs),
)
def test_connection_kwargs(self, mock_get_valid_kwargs, mock_client):
"""A spot check to make sure optional connection kwargs gets passed to connection"""
MongoDict('test', host='http://0.0.0.0', port=1234, invalid_kwarg='???')
mock_client.assert_called_with(host='http://0.0.0.0', port=1234)
class TestMongoCache(BaseCacheTest):
backend_class = MongoCache
document_support = True
init_kwargs = {'serializer': None} # Use class default serializer instead of pickle
@pytest.mark.parametrize('serializer', MONGODB_SERIALIZERS)
@pytest.mark.parametrize('method', HTTPBIN_METHODS)
@pytest.mark.parametrize('field', ['params', 'data', 'json'])
def test_all_methods(self, field, method, serializer):
super().test_all_methods(field, method, serializer)
@pytest.mark.parametrize('serializer', MONGODB_SERIALIZERS)
@pytest.mark.parametrize('response_format', HTTPBIN_FORMATS)
def test_all_response_formats(self, response_format, serializer):
super().test_all_response_formats(response_format, serializer)
def test_ttl(self):
session = self.init_session()
session.cache.set_ttl(1)
session.get(httpbin('get'))
response = session.get(httpbin('get'))
assert response.from_cache is True
# Wait up to 60 seconds for removal background process to run
# Unfortunately there doesn't seem to be a way to manually trigger it
for i in range(60):
if response.cache_key not in session.cache.responses:
logger.debug(f'Removed {response.cache_key} after {i} seconds')
break
sleep(1)
assert response.cache_key not in session.cache.responses
def test_ttl__overwrite(self):
session = self.init_session()
session.cache.set_ttl(60)
# Should have no effect
session.cache.set_ttl(360)
assert session.cache.get_ttl() == 60
# Should create new index
session.cache.set_ttl(360, overwrite=True)
assert session.cache.get_ttl() == 360
# Should drop index
session.cache.set_ttl(None, overwrite=True)
assert session.cache.get_ttl() is None
# Should attempt to drop non-existent index and ignore error
session.cache.set_ttl(NEVER_EXPIRE, overwrite=True)
assert session.cache.get_ttl() is None
class TestGridFSPickleDict(BaseStorageTest):
storage_class = GridFSPickleDict
picklable = True
num_instances = 1 # Only test a single collecton instead of multiple
@patch('requests_cache.backends.gridfs.GridFS')
@patch('requests_cache.backends.gridfs.MongoClient')
@patch(
'requests_cache.backends.gridfs.get_valid_kwargs',
side_effect=lambda cls, kwargs: get_valid_kwargs(MongoClient, kwargs),
)
def test_connection_kwargs(self, mock_get_valid_kwargs, mock_client, mock_gridfs):
"""A spot check to make sure optional connection kwargs gets passed to connection"""
GridFSPickleDict('test', host='http://0.0.0.0', port=1234, invalid_kwarg='???')
mock_client.assert_called_with(host='http://0.0.0.0', port=1234)
def test_corrupt_file(self):
"""A corrupted file should be handled and raise a KeyError instead"""
cache = self.init_cache()
cache['key'] = 'value'
with pytest.raises(KeyError), patch.object(GridFS, 'find_one', side_effect=CorruptGridFile):
cache['key']
def test_file_exists(self):
cache = self.init_cache()
# This write should just quiety fail
with patch.object(GridFS, 'put', side_effect=FileExists):
cache['key'] = 'value_1'
assert 'key' not in cache
class TestGridFSCache(BaseCacheTest):
backend_class = GridFSCache
document_support = False