blob: 93a5616105c04aa04d6def1d8a671b89d00f6f86 [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""lib of Google Cloud KMS, an encryption key management system on GCP.
Used for encrypt/decrypt credentials, e.g. API keys.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import base64
import collections
from google.oauth2 import service_account # pylint: disable=import-error,no-name-in-module
from googleapiclient import discovery
from test_analyzer import configs
# A KMS Crypto Key's information, including
# project_id: a string, refers to the project that registers KMS,
# location_id: a string, refers to the geographical location where the
# cryptographic keys are stored.
# key_ring_id: a string, refers to the id of key ring, a concept of a
# grouping of keys.
# crypto_key_id: a string, refers to the id of a cryptographic key.
KMSCryptoKey = collections.namedtuple(
'KMSCryptoKEY',
[
'project_id',
'location_id',
'key_ring_id',
'crypto_key_id',
])
DEFAULT_CRYPTO_KEY_FOR_API = KMSCryptoKey(
project_id=configs.PROJECT_ID_PROD,
location_id='global',
key_ring_id='creds',
crypto_key_id='apikey')
def _CreateKMSClient():
"""Create a KMS Client to connect to cloudkms service.
Returns:
A discovery.build client for connecting to cloudkms.
"""
# We will only kms crypte key built in prod instance to encrypt/decrypt.
secret_json = configs.GetServiceAccountCredentials(is_stage=False)
credentials = service_account.Credentials.from_service_account_file(
secret_json)
return discovery.build('cloudkms', 'v1', credentials=credentials)
def _CreateResourceName(crypto_key_info):
"""Create a resource name to encrypt/decrypt.
Args:
crypto_key_info: A KMSCryptoKey object, including all required key infos.
"""
return 'projects/%s/locations/%s/keyRings/%s/cryptoKeys/%s' % (
crypto_key_info.project_id, crypto_key_info.location_id,
crypto_key_info.key_ring_id, crypto_key_info.crypto_key_id)
def Encrypt(crypto_key_info, plaintext):
"""Encrypts data with the provided CryptoKey.
This function encrypts plaintext using the provided CryptoKey. It can only be
recovered with Decrypt().
Args:
crypto_key_info: A KMSCryptoKey object, including all required key infos.
plaintext: The string plain text to be encrypted.
Returns:
A string cipher text in bytes.
"""
kms_client = _CreateKMSClient()
resource_name = _CreateResourceName(crypto_key_info)
# Use the KMS API to encrypt the data.
crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys()
request = crypto_keys.encrypt(
name=resource_name,
body={'plaintext': base64.b64encode(plaintext.encode()).decode('ascii')})
response = request.execute()
return base64.b64decode(response['ciphertext'].encode('ascii'))
def Decrypt(crypto_key_info, ciphertext):
"""Decrypts data with the provided CryptoKey.
This function decrypts the cipher text which is encrypted using the
provided CryptoKey.
Args:
crypto_key_info: A KMSCryptoKey object, including all required key infos.
ciphertext: The string cipher text to be decrypted in bytes.
Returns:
A string plain text.
"""
kms_client = _CreateKMSClient()
resource_name = _CreateResourceName(crypto_key_info)
# Use the KMS API to decrypt the data.
crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys()
request = crypto_keys.decrypt(
name=resource_name,
body={'ciphertext': base64.b64encode(ciphertext).decode('ascii')})
response = request.execute()
return base64.b64decode(response['plaintext'].encode('ascii')).decode()
def DecryptFromFile(crypto_key_info, ciphertext_file_name):
"""Decrypt data in a ciphertext_file.
Args:
crypto_key_info: A KMSCryptoKey object, including all required key infos.
ciphertext_file_name: The string file name to store cipher text.
Returns:
A string plain text after decryption.
"""
with open(ciphertext_file_name, 'rb') as ciphertext_file:
ciphertext = ciphertext_file.read()
return Decrypt(crypto_key_info, ciphertext)