| import os |
| import unittest |
| |
| from test.support import import_helper, threading_helper |
| from test.support.threading_helper import run_concurrently |
| from uuid import SafeUUID |
| |
| c_uuid = import_helper.import_module("_uuid") |
| |
| NTHREADS = 10 |
| UUID_PER_THREAD = 1000 |
| |
| |
| @threading_helper.requires_working_threading() |
| class UUIDTests(unittest.TestCase): |
| @unittest.skipUnless(os.name == "posix", "POSIX only") |
| def test_generate_time_safe(self): |
| uuids = [] |
| |
| def worker(): |
| local_uuids = [] |
| for _ in range(UUID_PER_THREAD): |
| uuid, is_safe = c_uuid.generate_time_safe() |
| self.assertIs(type(uuid), bytes) |
| self.assertEqual(len(uuid), 16) |
| # Collect the UUID only if it is safe. If not, we cannot ensure |
| # UUID uniqueness. According to uuid_generate_time_safe() man |
| # page, it is theoretically possible for two concurrently |
| # running processes to generate the same UUID(s) if the return |
| # value is not 0. |
| if is_safe == SafeUUID.safe: |
| local_uuids.append(uuid) |
| |
| # Merge all safe uuids |
| uuids.extend(local_uuids) |
| |
| run_concurrently(worker_func=worker, nthreads=NTHREADS) |
| self.assertEqual(len(uuids), len(set(uuids))) |
| |
| @unittest.skipUnless(os.name == "nt", "Windows only") |
| def test_UuidCreate(self): |
| uuids = [] |
| |
| def worker(): |
| local_uuids = [] |
| for _ in range(UUID_PER_THREAD): |
| uuid = c_uuid.UuidCreate() |
| self.assertIs(type(uuid), bytes) |
| self.assertEqual(len(uuid), 16) |
| local_uuids.append(uuid) |
| |
| # Merge all uuids |
| uuids.extend(local_uuids) |
| |
| run_concurrently(worker_func=worker, nthreads=NTHREADS) |
| self.assertEqual(len(uuids), len(set(uuids))) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |