| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """lib of Google Cloud KMS, an encryption key management system on GCP. |
| |
| Used for encrypt/decrypt credentials, e.g. API keys. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import base64 |
| import collections |
| |
| from google.oauth2 import service_account # pylint: disable=import-error,no-name-in-module |
| from googleapiclient import discovery |
| |
| from test_analyzer import configs |
| |
| |
| # A KMS Crypto Key's information, including |
| # project_id: a string, refers to the project that registers KMS, |
| # location_id: a string, refers to the geographical location where the |
| # cryptographic keys are stored. |
| # key_ring_id: a string, refers to the id of key ring, a concept of a |
| # grouping of keys. |
| # crypto_key_id: a string, refers to the id of a cryptographic key. |
| KMSCryptoKey = collections.namedtuple( |
| 'KMSCryptoKEY', |
| [ |
| 'project_id', |
| 'location_id', |
| 'key_ring_id', |
| 'crypto_key_id', |
| ]) |
| DEFAULT_CRYPTO_KEY_FOR_API = KMSCryptoKey( |
| project_id=configs.PROJECT_ID_PROD, |
| location_id='global', |
| key_ring_id='creds', |
| crypto_key_id='apikey') |
| |
| |
| def _CreateKMSClient(): |
| """Create a KMS Client to connect to cloudkms service. |
| |
| Returns: |
| A discovery.build client for connecting to cloudkms. |
| """ |
| # We will only kms crypte key built in prod instance to encrypt/decrypt. |
| secret_json = configs.GetServiceAccountCredentials(is_stage=False) |
| credentials = service_account.Credentials.from_service_account_file( |
| secret_json) |
| return discovery.build('cloudkms', 'v1', credentials=credentials) |
| |
| |
| def _CreateResourceName(crypto_key_info): |
| """Create a resource name to encrypt/decrypt. |
| |
| Args: |
| crypto_key_info: A KMSCryptoKey object, including all required key infos. |
| """ |
| return 'projects/%s/locations/%s/keyRings/%s/cryptoKeys/%s' % ( |
| crypto_key_info.project_id, crypto_key_info.location_id, |
| crypto_key_info.key_ring_id, crypto_key_info.crypto_key_id) |
| |
| |
| def Encrypt(crypto_key_info, plaintext): |
| """Encrypts data with the provided CryptoKey. |
| |
| This function encrypts plaintext using the provided CryptoKey. It can only be |
| recovered with Decrypt(). |
| |
| Args: |
| crypto_key_info: A KMSCryptoKey object, including all required key infos. |
| plaintext: The string plain text to be encrypted. |
| |
| Returns: |
| A string cipher text in bytes. |
| """ |
| kms_client = _CreateKMSClient() |
| resource_name = _CreateResourceName(crypto_key_info) |
| |
| # Use the KMS API to encrypt the data. |
| crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() |
| request = crypto_keys.encrypt( |
| name=resource_name, |
| body={'plaintext': base64.b64encode(plaintext.encode()).decode('ascii')}) |
| response = request.execute() |
| return base64.b64decode(response['ciphertext'].encode('ascii')) |
| |
| |
| def Decrypt(crypto_key_info, ciphertext): |
| """Decrypts data with the provided CryptoKey. |
| |
| This function decrypts the cipher text which is encrypted using the |
| provided CryptoKey. |
| |
| Args: |
| crypto_key_info: A KMSCryptoKey object, including all required key infos. |
| ciphertext: The string cipher text to be decrypted in bytes. |
| |
| Returns: |
| A string plain text. |
| """ |
| kms_client = _CreateKMSClient() |
| resource_name = _CreateResourceName(crypto_key_info) |
| |
| # Use the KMS API to decrypt the data. |
| crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() |
| request = crypto_keys.decrypt( |
| name=resource_name, |
| body={'ciphertext': base64.b64encode(ciphertext).decode('ascii')}) |
| response = request.execute() |
| return base64.b64decode(response['plaintext'].encode('ascii')).decode() |
| |
| |
| def DecryptFromFile(crypto_key_info, ciphertext_file_name): |
| """Decrypt data in a ciphertext_file. |
| |
| Args: |
| crypto_key_info: A KMSCryptoKey object, including all required key infos. |
| ciphertext_file_name: The string file name to store cipher text. |
| |
| Returns: |
| A string plain text after decryption. |
| """ |
| with open(ciphertext_file_name, 'rb') as ciphertext_file: |
| ciphertext = ciphertext_file.read() |
| |
| return Decrypt(crypto_key_info, ciphertext) |