blob: 3b4f369a41ae1539f3b68937146523a1207f1402 [file] [log] [blame]
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Task queues for fulfilling lease requests."""
import json
import logging
from google.appengine.ext import ndb
import webapp2
from components import decorators
from components import net
from components import pubsub
from components.machine_provider import rpc_messages
import metrics
import models
def maybe_notify_backend(message, hostname, policies):
"""Informs the backend of the status of a request if there's a Pub/Sub topic.
Args:
message: The message string to send.
hostname: The hostname of the machine this message concerns.
policies: A dict representation of an rpc_messages.Policies instance.
"""
if policies.get('backend_topic'):
topic = pubsub.full_topic_name(
policies['backend_project'], policies['backend_topic'])
attributes = {
attribute['key']: attribute['value']
for attribute in policies['backend_attributes']
}
attributes['hostname'] = hostname
pubsub.publish(topic, message, attributes)
# There are relatively few backends, so it's safe to include the
# backend topic/project as the value for the target field.
metrics.pubsub_messages_sent.increment(fields={'target': topic})
def maybe_notify_lessee(request, response):
"""Informs the lessee of the status of a request if there's a Pub/Sub topic.
Args:
request: A dict representation of an rpc_messages.LeaseRequest instance.
response: A dict representation of an rpc_messages.LeaseResponse instance.
"""
if request.get('pubsub_topic'):
pubsub.publish(
pubsub.full_topic_name(
request['pubsub_project'], request['pubsub_topic']),
json.dumps(response),
{},
)
metrics.pubsub_messages_sent.increment(fields={'target': 'lessee'})
class LeaseRequestFulfiller(webapp2.RequestHandler):
"""Worker for fulfilling lease requests."""
@decorators.require_taskqueue('fulfill-lease-request')
def post(self):
"""Fulfill a lease request.
Params:
policies: JSON-encoded string representation of the
rpc_messages.Policies governing this machine.
request_json: JSON-encoded string representation of the
rpc_messages.LeaseRequest being fulfilled.
response_json: JSON-encoded string representation of the
rpc_messages.LeaseResponse being delivered.
"""
policies = json.loads(self.request.get('policies'))
request = json.loads(self.request.get('request_json'))
response = json.loads(self.request.get('response_json'))
maybe_notify_backend('LEASED', response['hostname'], policies)
maybe_notify_lessee(request, response)
@ndb.transactional(xg=True)
def reclaim(machine_key):
"""Reclaims a machine.
Args:
machine_key: ndb.Key for a models.CatalogMachineEntry.
"""
machine = machine_key.get()
if not machine:
return
lease = models.LeaseRequest.get_by_id(machine.lease_id)
lease.machine_id = None
lease.response.hostname = None
machine.key.delete()
lease.put()
class MachineReclaimer(webapp2.RequestHandler):
"""Worker for reclaiming machines."""
@decorators.require_taskqueue('reclaim-machine')
def post(self):
"""Reclaim a machine.
Params:
hostname: Hostname of the machine being reclaimed.
machine_key: URL-safe ndb.Key for a models.CatalogMachineEntry.
policies: JSON-encoded string representation of the
rpc_messages.Policies governing this machine.
request_json: JSON-encoded string representation of the
rpc_messages.LeaseRequest being fulfilled.
response_json: JSON-encoded string representation of the
rpc_messages.LeaseResponse being delivered.
"""
hostname = self.request.get('hostname')
machine_key = ndb.Key(urlsafe=self.request.get('machine_key'))
policies = json.loads(self.request.get('policies'))
request = json.loads(self.request.get('request_json'))
response = json.loads(self.request.get('response_json'))
assert machine_key.kind() == 'CatalogMachineEntry', machine_key
maybe_notify_backend('RECLAIMED', hostname, policies)
maybe_notify_lessee(request, response)
reclaim(machine_key)
metrics.lease_requests_expired.increment()
def create_queues_app():
return webapp2.WSGIApplication([
('/internal/queues/fulfill-lease-request', LeaseRequestFulfiller),
('/internal/queues/reclaim-machine', MachineReclaimer),
])