blob: 6d5f8bd3b7188decd1518394bf5fef35551a98e3 [file] [log] [blame]
# -*- coding: utf-8 -*-"
# Copyright 2021 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Use information in rebase database to create rebase spreadsheet
Required python modules:
google-api-python-client google-auth-httplib2 google-auth-oauthlib
The Google Sheets API needs to be enabled to run this script.
Also, you'll need to generate access credentials and store those
in credentials.json.
Disable pyline noise
pylint: disable=no-absolute-import
"""
import os
import pickle
from google.auth.transport.requests import (
Request, # pylint: disable=import-error, disable=no-name-in-module
)
from google.oauth2.service_account import (
Credentials, # pylint: disable=import-error
)
from google_auth_oauthlib.flow import (
InstalledAppFlow, # pylint: disable=import-error
)
from googleapiclient import discovery # pylint: disable=import-error
SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]
# Spreadsheet functions
def getsheet():
"""Get and return reference to spreadsheet"""
creds = None
# The file credentials_service.json stores service account credentials
# and service account has precedence over user's account
if os.path.exists("credentials_service.json"):
creds = Credentials.from_service_account_file(
"credentials_service.json"
)
service = discovery.build("sheets", "v4", credentials=creds)
return service.spreadsheets() # pylint: disable=no-member
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
try: # py2 or token saved with py3
creds = pickle.load(token)
except UnicodeDecodeError: # py3, token saved with py2
creds = pickle.load(
token, encoding="latin-1"
) # pylint: disable=unexpected-keyword-arg
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
"credentials.json", SCOPES
)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
service = discovery.build("sheets", "v4", credentials=creds)
# service = discovery.build('sheets', 'v4', developerKey=API_KEY)
return service.spreadsheets() # pylint: disable=no-member
def init_spreadsheet(filename, title):
"""Initialize spreadsheet"""
sheet = getsheet()
if filename is None:
ssid = create_spreadsheet(sheet, title)
else:
try:
with open(filename, "r", encoding="utf-8") as f:
ssid = f.read().strip("\n")
request = sheet.get(
spreadsheetId=ssid, ranges=[], includeGridData=False
)
response = request.execute()
sheets = response.get("sheets")
delete_sheets((sheet, ssid), sheets)
except IOError:
ssid = create_spreadsheet(sheet, title)
with open(filename, "w", encoding="utf-8") as f:
f.write(ssid)
return (sheet, ssid)
# Generic topic functions
def get_other_topic_id(c):
"""Calculate and return other_topic_id"""
other_topic_id = 0
c.execute("select topic, name from topics order by name")
for topic, name in c.fetchall():
if name == "other":
return topic
if topic >= other_topic_id:
other_topic_id = topic + 1
return other_topic_id
def get_topic_name(c, topic):
"""Get topic name from topic id"""
c.execute("select name from topics where topic is '%s'" % topic)
topic = c.fetchone()
if topic:
return topic[0]
return None
# Spreadsheet manipulation functions
def doit(sheet, requests):
"""Execute a request"""
body = {"requests": requests}
request = sheet[0].batchUpdate(spreadsheetId=sheet[1], body=body)
response = request.execute()
return response
def hide_sheet(sheet, sheetId, hide):
"""Move 'Data' sheet to end of spreadsheet."""
request = []
request.append(
{
"updateSheetProperties": {
"properties": {
"sheetId": sheetId,
"hidden": hide,
},
"fields": "hidden",
}
}
)
doit(sheet, request)
def create_spreadsheet(sheet, title):
"""Create a spreadsheet and return reference to it"""
spreadsheet = {"properties": {"title": title, "locale": "en_US"}}
request = sheet.create(body=spreadsheet, fields="spreadsheetId")
response = request.execute()
return response.get("spreadsheetId")
def delete_sheets(sheet, sheets):
"""Delete all sheets except sheet 0. In sheet 0, delete all values."""
# Unhide 'Data' sheet. If it is hidden we can't remove the other sheets.
hide_sheet(sheet, 0, False)
request = []
for s in sheets:
sheetId = s["properties"]["sheetId"]
if sheetId != 0:
request.append({"deleteSheet": {"sheetId": sheetId}})
else:
rows = s["properties"]["gridProperties"]["rowCount"]
request.append(
{
"deleteRange": {
"range": {
"sheetId": sheetId,
"startRowIndex": 0,
"endRowIndex": rows,
},
"shiftDimension": "ROWS",
}
}
)
# We are letting this fail if there was nothing to clean. This will
# hopefully result in re-creating the spreadsheet.
doit(sheet, request)
def resize_sheet(requests, sheetId, start, end):
"""Resize a sheet in provided range"""
requests.append(
{
"autoResizeDimensions": {
"dimensions": {
"sheetId": sheetId,
"dimension": "COLUMNS",
"startIndex": start,
"endIndex": end,
}
}
}
)
def add_sheet_header(requests, sheetId, fields):
"""Add provided header line to specified sheet.
Make it bold.
Args:
requests: Reference to list of requests to send to API.
sheetId: Sheet Id
fields: string with comma-separated list of fields
"""
# Generate header row
requests.append(
{
"pasteData": {
"data": fields,
"type": "PASTE_NORMAL",
"delimiter": ",",
"coordinate": {"sheetId": sheetId, "rowIndex": 0},
}
}
)
# Convert header row to bold and centered
requests.append(
{
"repeatCell": {
"range": {
"sheetId": sheetId,
"startRowIndex": 0,
"endRowIndex": 1,
},
"cell": {
"userEnteredFormat": {
"horizontalAlignment": "CENTER",
"textFormat": {"bold": True},
}
},
"fields": "userEnteredFormat(textFormat,horizontalAlignment)",
}
}
)
def move_sheet(sheet, sheetId, to):
"""Move sheet to end of spreadsheet."""
request = []
request.append(
{
"updateSheetProperties": {
"properties": {
"sheetId": sheetId,
"index": to,
},
"fields": "index",
}
}
)
doit(sheet, request)
def sort_sheet(request, sheetId, sortby, order, rows, columns):
"""Sort sheet in given order, starting with row 1, all rows and columns as provided."""
request.append(
{
"sortRange": {
"range": {
"sheetId": sheetId,
"startRowIndex": 1,
"endRowIndex": rows,
"startColumnIndex": 0,
"endColumnIndex": columns,
},
"sortSpecs": [{"dimensionIndex": sortby, "sortOrder": order}],
}
}
)
def source_range(sheetId, rows, column):
"""Return source range"""
return {
"sourceRange": {
"sources": [
{
"sheetId": sheetId,
"startRowIndex": 0,
"endRowIndex": rows,
"startColumnIndex": column,
"endColumnIndex": column + 1,
}
]
}
}
def scope(name, sheetId, rows, column):
"""Return single source range"""
return {name: source_range(sheetId, rows, column)}
def sscope(name, sheetId, rows, start, end):
"""Return multiple source ranges"""
s = [scope(name, sheetId, rows, start)]
while start < end:
start += 1
s += [scope(name, sheetId, rows, start)]
return s
def share_sheet(fileId, users, emailMsg):
"""Share spreadsheet with defined users/groups."""
# This funtion applies only when service account is being used for creation
# of a spreadsheet otherwise user has to manually share the generated
# spreadsheet with other users/groups
if not os.path.exists("credentials_service.json"):
return
creds = Credentials.from_service_account_file("credentials_service.json")
drive = discovery.build(
"drive", "v3", credentials=creds
) # pylint: disable=no-member
perm = {}
for email, val in users.items():
perm["emailAddress"] = email
perm["type"] = val[0]
perm["role"] = val[1]
drive.permissions().create( # pylint: disable=no-member
fileId=fileId,
body=perm,
transferOwnership=False,
emailMessage=emailMsg,
).execute()