blob: 683e0e97d12a06dd380c8c350aed71b2e8870c26 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import threading
import unittest
import concurrent
def _ForkTestHelper(arg1, arg2, pickle_me_not, test_instance, parent_pid):
_ = pickle_me_not # Suppress lint warning.
test_instance.assertNotEquals(os.getpid(), parent_pid)
return arg1 + arg2
class Unpicklable(object):
"""Ensures that pickle() is not called on parameters."""
def __getstate__(self):
raise AssertionError('Tried to pickle')
class ConcurrentTest(unittest.TestCase):
def testEncodeDictOfLists_Empty(self):
test_dict = {}
encoded = concurrent.EncodeDictOfLists(test_dict)
decoded = concurrent.DecodeDictOfLists(encoded)
self.assertEquals(test_dict, decoded)
def testEncodeDictOfLists_EmptyValue(self):
test_dict = {'foo': []}
encoded = concurrent.EncodeDictOfLists(test_dict)
decoded = concurrent.DecodeDictOfLists(encoded)
self.assertEquals(test_dict, decoded)
def testEncodeDictOfLists_AllStrings(self):
test_dict = {'foo': ['a', 'b', 'c'], 'foo2': ['a', 'b']}
encoded = concurrent.EncodeDictOfLists(test_dict)
decoded = concurrent.DecodeDictOfLists(encoded)
self.assertEquals(test_dict, decoded)
def testEncodeDictOfLists_KeyTransform(self):
test_dict = {0: ['a', 'b', 'c'], 9: ['a', 'b']}
encoded = concurrent.EncodeDictOfLists(test_dict, key_transform=str)
decoded = concurrent.DecodeDictOfLists(encoded, key_transform=int)
self.assertEquals(test_dict, decoded)
def testEncodeDictOfLists_ValueTransform(self):
test_dict = {'a': ['0', '1', '2'], 'b': ['3', '4']}
expected = {'a': [0, 1, 2], 'b': [3, 4]}
encoded = concurrent.EncodeDictOfLists(test_dict)
decoded = concurrent.DecodeDictOfLists(encoded, value_transform=int)
self.assertEquals(expected, decoded)
def testEncodeDictOfLists_Join_Empty(self):
test_dict1 = {}
test_dict2 = {}
expected = {}
encoded1 = concurrent.EncodeDictOfLists(test_dict1)
encoded2 = concurrent.EncodeDictOfLists(test_dict2)
encoded = concurrent.JoinEncodedDictOfLists([encoded1, encoded2])
decoded = concurrent.DecodeDictOfLists(encoded)
self.assertEquals(expected, decoded)
def testEncodeDictOfLists_Join_Singl(self):
test_dict1 = {'key1': ['a']}
encoded1 = concurrent.EncodeDictOfLists(test_dict1)
encoded = concurrent.JoinEncodedDictOfLists([encoded1])
decoded = concurrent.DecodeDictOfLists(encoded)
self.assertEquals(test_dict1, decoded)
def testEncodeDictOfLists_JoinMultiple(self):
test_dict1 = {'key1': ['a']}
test_dict2 = {'key2': ['b']}
expected = {'key1': ['a'], 'key2': ['b']}
encoded1 = concurrent.EncodeDictOfLists(test_dict1)
encoded2 = concurrent.EncodeDictOfLists({})
encoded3 = concurrent.EncodeDictOfLists(test_dict2)
encoded = concurrent.JoinEncodedDictOfLists([encoded1, encoded2, encoded3])
decoded = concurrent.DecodeDictOfLists(encoded)
self.assertEquals(expected, decoded)
def testCallOnThread(self):
main_thread = threading.current_thread()
def callback(arg1, arg2):
self.assertEquals(1, arg1)
self.assertEquals(2, arg2)
my_thread = threading.current_thread()
self.assertNotEquals(my_thread, main_thread)
return 3
result = concurrent.CallOnThread(callback, 1, arg2=2)
self.assertEquals(3, result.get())
def testForkAndCall_normal(self):
parent_pid = os.getpid()
result = concurrent.ForkAndCall(
_ForkTestHelper, (1, 2, Unpicklable(), self, parent_pid))
self.assertEquals(3, result.get())
def testForkAndCall_exception(self):
parent_pid = os.getpid()
result = concurrent.ForkAndCall(
_ForkTestHelper, (1, 'a', None, self, parent_pid))
self.assertRaises(TypeError, result.get)
def testBulkForkAndCall_none(self):
results = concurrent.BulkForkAndCall(_ForkTestHelper, [])
self.assertEquals([], list(results))
def testBulkForkAndCall_few(self):
parent_pid = os.getpid()
results = concurrent.BulkForkAndCall(_ForkTestHelper, [
(1, 2, Unpicklable(), self, parent_pid),
(3, 4, None, self, parent_pid)])
self.assertEquals({3, 7}, set(results))
def testBulkForkAndCall_few_kwargs(self):
parent_pid = os.getpid()
results = concurrent.BulkForkAndCall(_ForkTestHelper,
[(1, 2, Unpicklable()), (3, 4, None)],
test_instance=self, parent_pid=parent_pid)
self.assertEquals({3, 7}, set(results))
def testBulkForkAndCall_many(self):
parent_pid = os.getpid()
args = [(1, 2, Unpicklable(), self, parent_pid) for _ in xrange(100)]
results = concurrent.BulkForkAndCall(_ForkTestHelper, args)
self.assertEquals([3] * 100, list(results))
def testBulkForkAndCall_many_kwargs(self):
parent_pid = os.getpid()
args = [(1, 2) for _ in xrange(100)]
results = concurrent.BulkForkAndCall(
_ForkTestHelper, args, pickle_me_not=Unpicklable(), test_instance=self,
parent_pid=parent_pid)
self.assertEquals([3] * 100, list(results))
def testBulkForkAndCall_exception(self):
parent_pid = os.getpid()
results = concurrent.BulkForkAndCall(_ForkTestHelper, [
(1, 'a', self, parent_pid)])
self.assertRaises(TypeError, results.next)
if __name__ == '__main__':
unittest.main()