| # Copyright 2017 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import datetime |
| import json |
| import mock |
| import unittest |
| from mock import patch |
| |
| from google.protobuf import duration_pb2 |
| from google.protobuf import empty_pb2 |
| from google.protobuf import struct_pb2 |
| from google.protobuf import timestamp_pb2 |
| |
| from infra_libs import bqh |
| from infra_libs.test import testmessage_pb2 |
| |
| |
| class TestBigQueryHelper(unittest.TestCase): |
| def setUp(self): |
| super(TestBigQueryHelper, self).setUp() |
| self.bq_client = mock.Mock() |
| |
| self.dataset_id = 'test_dataset' |
| self.table_id = 'test_table' |
| self.table = self.bq_client.get_table( |
| self.bq_client.dataset(self.dataset_id).table(self.table_id)) |
| self.mock_create_rows = self.bq_client.create_rows |
| self.mock_create_rows.return_value = None |
| |
| def test_send_rows_tuple(self): |
| rows = [('a',), ('b',), ('c',)] |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, rows) |
| self.mock_create_rows.assert_any_call(self.table, rows) |
| |
| def test_send_iterable_rows(self): |
| rows = [('a',), ('b',), ('c',)] |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, iter(rows)) |
| self.mock_create_rows.assert_any_call(self.table, rows) |
| |
| def test_batch_sizes(self): |
| rows = [('a',), ('b',), ('c',)] |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, rows, 0) |
| self.mock_create_rows.assert_any_call(self.table, rows) |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, rows, 1) |
| self.mock_create_rows.assert_any_call(self.table, [('a',)]) |
| self.mock_create_rows.assert_any_call(self.table, [('b',)]) |
| self.mock_create_rows.assert_any_call(self.table, [('c',)]) |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, rows, |
| bqh._BATCH_LIMIT+1) |
| self.mock_create_rows.assert_any_call(self.table, rows) |
| |
| def test_send_rows_unsupported_type(self): |
| with self.assertRaises(bqh.UnsupportedTypeError): |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, [{}]) |
| |
| def test_send_rows_message(self): |
| rows = [testmessage_pb2.TestMessage(str='a')] |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, rows) |
| expected_rows_arg = [{'num': 0, 'e': 'E0', 'str': u'a'}] |
| self.mock_create_rows.assert_any_call(self.table, expected_rows_arg) |
| |
| def test_send_rows_with_errors(self): |
| rows = [('a',), ('b',), ('c',)] |
| self.mock_create_rows.return_value = [ |
| { |
| 'index': 0, |
| 'errors': ['some err'], |
| }, |
| ] |
| with self.assertRaises(bqh.BigQueryInsertError): |
| bqh.send_rows(self.bq_client, self.dataset_id, self.table_id, rows) |
| |
| def test_message_to_dict(self): |
| struct0 = struct_pb2.Struct() |
| struct0['a'] = 0 |
| struct1 = struct_pb2.Struct() |
| struct1['a'] = 1 |
| |
| dt0 = datetime.datetime(2018, 2, 20) |
| dt1 = datetime.datetime(2018, 2, 21) |
| ts0 = timestamp_pb2.Timestamp() |
| ts0.FromDatetime(dt0) |
| ts1 = timestamp_pb2.Timestamp() |
| ts1.FromDatetime(dt1) |
| dur = duration_pb2.Duration() |
| dur.FromTimedelta(datetime.timedelta(seconds=2, microseconds=3)) |
| |
| mmap_hello=testmessage_pb2.NestedMessage(num=100) |
| mmap_world=testmessage_pb2.NestedMessage(str='yo.') |
| |
| msg = testmessage_pb2.TestMessage( |
| str='a', |
| strs=['a', 'b'], |
| |
| num=1, |
| nums=[0, 1, 2], |
| |
| nested=testmessage_pb2.NestedMessage(num=1, str='a'), |
| nesteds=[ |
| testmessage_pb2.NestedMessage(num=1, str='a'), |
| testmessage_pb2.NestedMessage(num=2, str='b'), |
| ], |
| |
| empty=empty_pb2.Empty(), |
| empties=[empty_pb2.Empty(), empty_pb2.Empty()], |
| |
| e=testmessage_pb2.E1, |
| es=[testmessage_pb2.E0, testmessage_pb2.E2], |
| |
| struct=struct0, |
| structs=[struct0, struct1], |
| |
| timestamp=ts0, |
| timestamps=[ts0, ts1], |
| |
| repeated_container=testmessage_pb2.RepeatedContainer(nums=[1, 2]), |
| duration=dur, |
| |
| enum_map={ |
| 'hello': testmessage_pb2.E1, |
| 'world': testmessage_pb2.E2, |
| }, |
| scalar_map={'hello': 1, 'world': 2}, |
| message_map={'hello': mmap_hello, 'world': mmap_world}, |
| ) |
| row = bqh.message_to_dict(msg) |
| |
| expected = { |
| 'str': u'a', |
| 'strs': [u'a', u'b'], |
| 'num': 1, |
| 'nums': [0, 1, 2], |
| 'nested': { |
| 'num': 1, |
| 'str': u'a' |
| }, |
| 'nesteds': [ |
| { |
| 'num': 1, |
| 'str': u'a' |
| }, |
| { |
| 'num': 2, |
| 'str': u'b' |
| }, |
| ], |
| |
| # empty messages are omitted |
| 'e': 'E1', |
| 'es': ['E0', 'E2'], |
| |
| # structs are compared separately. |
| 'timestamp': dt0.isoformat(), |
| 'timestamps': [dt0.isoformat(), dt1.isoformat()], |
| 'repeated_container': { |
| 'nums': [1, 2] |
| }, |
| 'duration': 2.000003, |
| |
| 'enum_map': [ |
| {'key': u'hello', 'value': 'E1'}, |
| {'key': u'world', 'value': 'E2'}, |
| ], |
| 'scalar_map': [ |
| {'key': u'hello', 'value': 1}, |
| {'key': u'world', 'value': 2}, |
| ], |
| 'message_map': [ |
| {'key': u'hello', 'value': {'num': 100, 'str': u''}}, |
| {'key': u'world', 'value': {'num': 0, 'str': u'yo.'}}, |
| ], |
| } |
| |
| # compare structs as JSON values, not strings. |
| self.assertEqual(json.loads(row.pop('struct')), {'a': 0}) |
| self.assertEqual( |
| [json.loads(s) for s in row.pop('structs')], |
| [{'a': 0}, {'a': 1}] |
| ) |
| |
| self.assertEqual(row, expected) |
| |
| def test_message_to_dict_empty(self): |
| row = bqh.message_to_dict(testmessage_pb2.TestMessage()) |
| expected = {'e': 'E0', 'str': u'', 'num': 0} |
| self.assertEqual(row, expected) |
| |
| def test_message_to_dict_repeated_container_with_no_elems(self): |
| row = bqh.message_to_dict(testmessage_pb2.TestMessage( |
| repeated_container=testmessage_pb2.RepeatedContainer())) |
| self.assertNotIn('repeated_container', row) |
| |
| def test_message_to_dict_invalid_enum(self): |
| with self.assertRaisesRegexp( |
| ValueError, '^Invalid value -1 for enum type bigquery.E$'): |
| bqh.message_to_dict(testmessage_pb2.TestMessage(e=-1)) |
| |
| def test_message_to_dict_omit_null(self): |
| with self.assertRaisesRegexp( |
| ValueError, '^Invalid value -1 for enum type bigquery.E$'): |
| bqh.message_to_dict(testmessage_pb2.TestMessage(e=-1)) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |