blob: 3b9135d80bdc2032aa3151f5c328b57acc3556ec [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Manages "next ID" to process."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import json
from google.cloud import exceptions # pylint: disable=import-error,no-name-in-module
from google.cloud import datastore
class Checkpointer(object):
"""Manages checkpoints.
A checkpoint is an entity in Google Cloud Datastore containing various
information persisted between builder runs.
The most important information in a checkpoint is "Next ID" which identifies
the primary ID of the oldest unprocessed entry in the source database.
On a successful run of the BigQuery table builder pipeline,
(maximum ID of entries processed in a run) + 1 will be saved as a next ID,
and on a next run it is read to continue processing subsequent entries.
"""
def __init__(self, project, kind, entity):
"""Initializes the checkpointer object. """
self.client = datastore.Client(project)
self.kind = kind
self.entity = entity
def Load(self):
"""Loads a checkpoint.
Returns:
Checkpoint object.
Raises:
CheckpointError: When the checkpoint is corrupted or not found.
"""
key = self.client.key(self.kind, self.entity)
content = self.client.get(key)
if not content:
raise CheckpointError(
'Checkpoint entity is empty. If this is the first run, '
'please create one under kind %s with name '
'%s manually in Google Cloud Console.' %
(self.kind, self.entity))
checkpoint = Checkpoint(
next_id=int(content.get('next_id')),
last_start_time=float(content.get('last_start_time')))
if not checkpoint.next_id:
raise CheckpointError('Checkpoint entity is corrupted: next_id is not set.')
return checkpoint
def Save(self, checkpoint):
"""Saves a checkpoint.
Args:
checkpoint: Checkpoint object to be saved as an entity in datastore.
Raises:
CheckpointError: When the checkpoint is not found.
"""
with self.client.transaction():
key = self.client.key(self.kind, self.entity)
content = self.client.get(key)
if not content:
raise CheckpointError(
'Checkpoint entity is empty. If this is the first run, '
'please create one under kind %s with name '
'%s manually in Google Cloud Console.' %
(self.kind, self.entity))
content['next_id'] = str(checkpoint.next_id)
content['last_start_time'] = str(checkpoint.last_start_time)
self.client.put(content)
# Represents a checkpoint.
# For compatibility, no fields must be deleted or assumed to be non-None.
Checkpoint = collections.namedtuple(
'Checkpoint',
[
# (int) Next entry ID to process.
'next_id',
# (float) Timestamp in seconds from UNIX epoch when the last build
# started.
'last_start_time',
])
class CheckpointError(Exception):
"""Raised when a checkpoint is corrupted or not found."""