blob: 614059f3c445823d9e913b428108a3f7b8285b6d [file]
// Copyright 2016 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package main
import (
"fmt"
"sort"
"strconv"
"strings"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/errors"
log "github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/deploytool/api/deploy"
"github.com/luci/luci-go/deploytool/managedfs"
)
// kubeDepoyedByMe is the "luci:managedBy" Kubernetes Deployment annotation
// indicating that a given Kubernetes Deployment is managed by this deployment
// tool.
const (
kubeManagedByKey = "luci.managedBy"
kubeManagedByMe = "luci-deploytool"
kubeDeployToolPrefix = "luci.deploytool/"
kubeVersionKey = kubeDeployToolPrefix + "version"
kubeSourceVersionKey = kubeDeployToolPrefix + "sourceVersion"
)
func isKubeDeployToolKey(v string) bool { return strings.HasPrefix(v, kubeDeployToolPrefix) }
// containerEngineDeployment is a consolidated Google Container Engine
// deployment configuration. It includes staged configurations for specific
// components, as well as global Google Container Engine state for a single
// cloud project.
//
// A container engine deployment is made up of specific clusters within
// Container Engine to independently manage/deploy. Each of these clusters is
// further made up of the Kubernetes pods that are configured to be deployed to
// those clusters. Finally, each pod is broken into specific Kubernetes
// components (Docker images) that the pod is composed of.
//
// Pods/containers are specified as Project Components, and are staged in
// component subdirectories.
//
// Clusters are configured independently. During deployment, pods/containers
// are deployed to clusters.
type containerEngineDeployment struct {
// project is the cloud project that this deployment is targeting.
project *layoutDeploymentCloudProject
// clusters is a map of the container engine clusters that have components
// being deployed.
clusters map[string]*containerEngineDeploymentCluster
// clusterNames is the sorted list of cluster names. It is prepared during
// staging.
clusterNames []string
// pods is the set of staged pods. Each pod corresponds to a single Project
// Component. This is separate from Clusters, since a single Pod may be
// deployed to multiple Clusters.
//
// There will be one Pod entry here per declaration, regardless of how many
// times it's deployed.
pods []*stagedGKEPod
// podMap is a map of pods to the clusters that they are deployed to.
podMap map[*layoutDeploymentGKEPod]*stagedGKEPod
// ignoreCurrentVersion is true if deployment should generate and push a new
// version even if its base parameters match the currently-deployed version.
ignoreCurrentVersion bool
// timestampSuffix is timestamp suffix appended to Docker image names. This is
// used to differentiate Docker images from the same version.
timestampSuffix string
}
func (d *containerEngineDeployment) addCluster(cluster *layoutDeploymentGKECluster) *containerEngineDeploymentCluster {
if c := d.clusters[cluster.Name]; c != nil {
return c
}
c := containerEngineDeploymentCluster{
gke: d,
cluster: cluster,
}
d.clusters[cluster.Name] = &c
d.clusterNames = append(d.clusterNames, cluster.Name)
return &c
}
func (d *containerEngineDeployment) maybeRegisterPod(pod *layoutDeploymentGKEPod) *stagedGKEPod {
// If this is the first time we've seen this pod, add it to our pods list.
if sp := d.podMap[pod]; sp != nil {
return sp
}
sp := &stagedGKEPod{
ContainerEnginePod: pod.ContainerEnginePod,
gke: d,
pod: pod,
}
if d.podMap == nil {
d.podMap = make(map[*layoutDeploymentGKEPod]*stagedGKEPod)
}
d.podMap[pod] = sp
d.pods = append(d.pods, sp)
return sp
}
func (d *containerEngineDeployment) stage(w *work, root *managedfs.Dir, params *deployParams) error {
d.ignoreCurrentVersion = params.ignoreCurrentVersion
// Build a common timestamp suffix for our Docker images.
d.timestampSuffix = strconv.FormatInt(clock.Now(w).Unix(), 10)
podRoot, err := root.EnsureDirectory("pods")
if err != nil {
return errors.Annotate(err).Reason("failed to create pods directory").Err()
}
// Stage in parallel. We will stage all pods before we stage any containers,
// as container staging requires some pod staging values to be populated.
err = w.RunMulti(func(workC chan<- func() error) {
// Check and get all Kubernetes contexts in series.
//
// These all share the same Kubernetes configuration file, so we don't want
// them to stomp each other if we did them in parallel.
workC <- func() error {
for _, name := range d.clusterNames {
cluster := d.clusters[name]
var err error
if cluster.kubeCtx, err = getContainerEngineKubernetesContext(w, cluster.cluster); err != nil {
return errors.Annotate(err).Reason("failed to get Kubernetes context for %(cluster)q").
D("cluster", cluster.cluster.Name).Err()
}
}
return nil
}
for _, pod := range d.pods {
pod := pod
workC <- func() error {
// Use the name of this Pod's Component for staging directory.
name := pod.pod.comp.comp.Name
podDir, err := podRoot.EnsureDirectory(name)
if err != nil {
return errors.Annotate(err).Reason("failed to create pod directory for %(pod)q").
D("pod", name).Err()
}
return pod.stage(w, podDir, params)
}
}
})
if err != nil {
return err
}
// Now that pods are deployed, deploy our clusters.
clusterRoot, err := root.EnsureDirectory("clusters")
if err != nil {
return errors.Annotate(err).Reason("failed to create clusters directory").Err()
}
return w.RunMulti(func(workC chan<- func() error) {
// Stage each cluster and pod in parallel.
for _, name := range d.clusterNames {
cluster := d.clusters[name]
workC <- func() error {
clusterDir, err := clusterRoot.EnsureDirectory(cluster.cluster.Name)
if err != nil {
return errors.Annotate(err).Reason("failed to create cluster directory for %(cluster)q").
D("cluster", cluster.cluster.Name).Err()
}
return cluster.stage(w, clusterDir)
}
}
})
}
func (d *containerEngineDeployment) localBuild(w *work) error {
return w.RunMulti(func(workC chan<- func() error) {
for _, pod := range d.pods {
pod := pod
workC <- func() error {
return pod.build(w)
}
}
})
}
func (d *containerEngineDeployment) push(w *work) error {
return w.RunMulti(func(workC chan<- func() error) {
for _, pod := range d.pods {
pod := pod
workC <- func() error {
return pod.push(w)
}
}
})
}
func (d *containerEngineDeployment) commit(w *work) error {
// Push all clusters in parallel.
return w.RunMulti(func(workC chan<- func() error) {
for _, name := range d.clusterNames {
cluster := d.clusters[name]
workC <- func() error {
return cluster.commit(w)
}
}
})
}
// containerEngineDeploymentCluster is the deployment configuration for a single
// Google Compute Engine cluster. This includes the cluster's aggregate global
// configuration, as well as any staged pods that are being deployed to this
// cluster.
type containerEngineDeploymentCluster struct {
// gke is the containerEngineDeployment that owns this cluster.
gke *containerEngineDeployment
// cluster is the underlying cluster configuration.
cluster *layoutDeploymentGKECluster
// pods is the sorted list of pod deployments in this cluster.
pods []*containerEngineBoundPod
// scopes is the set of all scopes across all pods registered to this cluster,
// regardless of which are being deployed.
scopes []string
// kubeCtx is the name of the Kubernetes context, as defined/installed by
// gcloud.
kubeCtx string
}
func (c *containerEngineDeploymentCluster) attachPod(pod *layoutDeploymentGKEPodBinding) {
c.pods = append(c.pods, &containerEngineBoundPod{
sp: c.gke.maybeRegisterPod(pod.pod),
c: c,
binding: pod,
})
}
func (c *containerEngineDeploymentCluster) stage(w *work, root *managedfs.Dir) error {
// Determine which scopes this cluster will need. This is across ALL pods
// registered with the cluster, not just deployed ones.
scopeMap := make(map[string]struct{})
for _, bp := range c.pods {
for _, scope := range bp.sp.pod.Scopes {
scopeMap[scope] = struct{}{}
}
}
c.scopes = make([]string, 0, len(scopeMap))
for scope := range scopeMap {
c.scopes = append(c.scopes, scope)
}
sort.Strings(c.scopes)
// Stage for each deploymend pod.
return w.RunMulti(func(workC chan<- func() error) {
for _, bp := range c.pods {
bp := bp
workC <- func() error {
stageDir, err := root.EnsureDirectory(string(bp.sp.pod.comp.comp.title))
if err != nil {
return errors.Annotate(err).Reason("failed to create staging directory").Err()
}
return bp.stage(w, stageDir)
}
}
})
}
func (c *containerEngineDeploymentCluster) commit(w *work) error {
// Push all pods in parallel.
return w.RunMulti(func(workC chan<- func() error) {
for _, bp := range c.pods {
bp := bp
workC <- func() error {
return bp.commit(w)
}
}
})
}
func (c *containerEngineDeploymentCluster) kubectl(w *work) (*kubeTool, error) {
return w.tools.kubectl(c.kubeCtx)
}
// containerEngineBoundPod is a single staged pod deployed to a
// specific GKE cluster.
type containerEngineBoundPod struct {
// sp is the staged pod.
sp *stagedGKEPod
// cluster is the cluster that this pod is deployed to.
c *containerEngineDeploymentCluster
// binding is the binding between sp and c.
binding *layoutDeploymentGKEPodBinding
// deploymentYAMLPath is the filesystem path to the deployment YAML.
deploymentYAMLPath string
}
func (bp *containerEngineBoundPod) stage(w *work, root *managedfs.Dir) error {
comp := bp.sp.pod.comp
// Build our pod-wide deployment YAML.
// Generate our deployment YAML.
depYAML := kubeBuildDeploymentYAML(bp.binding, bp.sp.deploymentName, bp.sp.imageMap)
depYAML.Metadata.addAnnotation(kubeManagedByKey, kubeManagedByMe)
depYAML.Metadata.addAnnotation(kubeVersionKey, bp.sp.version.String())
depYAML.Metadata.addAnnotation(kubeSourceVersionKey, comp.source().Revision)
depYAML.Spec.Template.Metadata.addLabel("luci/project", string(comp.comp.proj.title))
depYAML.Spec.Template.Metadata.addLabel("luci/component", string(comp.comp.title))
deploymentYAML := root.File("deployment.yaml")
if err := deploymentYAML.GenerateYAML(w, depYAML); err != nil {
return errors.Annotate(err).Reason("failed to generate deployment YAML").Err()
}
bp.deploymentYAMLPath = deploymentYAML.String()
return nil
}
func (bp *containerEngineBoundPod) commit(w *work) error {
kubectl, err := bp.c.kubectl(w)
if err != nil {
return errors.Annotate(err).Err()
}
// Get the current deployment status for this pod.
var (
kd kubeDeployment
currentVersion string
)
switch err := kubectl.getResource(w, fmt.Sprintf("deployments/%s", bp.sp.deploymentName), &kd); err {
case nil:
// Got deployment status.
md := kd.Metadata
if md == nil {
return errors.Reason("current deployment has no metadata").Err()
}
// Make sure the current deployment is managed by this tool.
v, ok := md.Annotations[kubeManagedByKey].(string)
if !ok {
return errors.Reason("missing '" + kubeManagedByKey + "' annotation").Err()
}
if v != kubeManagedByMe {
log.Fields{
"managedBy": v,
"deployment": bp.sp.deploymentName,
}.Errorf(w, "Current deployment is not managed.")
return errors.Reason("unknown manager %(managedBy)q").D("managedBy", v).Err()
}
// Is the current deployment tagged at the current version?
currentVersion, ok = md.Annotations[kubeVersionKey].(string)
if !ok {
return errors.Reason("missing '" + kubeVersionKey + "' annotation").Err()
}
cloudVersion, err := parseCloudProjectVersion(bp.c.gke.project.VersionScheme, currentVersion)
switch {
case err != nil:
if !bp.c.gke.ignoreCurrentVersion {
return errors.Annotate(err).Reason("failed to parse current version %(version)q").
D("version", currentVersion).Err()
}
log.Fields{
log.ErrorKey: err,
"currentVersion": currentVersion,
}.Warningf(w, "Could not parse current version, but configured to ignore this failure.")
case cloudVersion.String() == bp.sp.version.String():
if !bp.c.gke.ignoreCurrentVersion {
log.Fields{
"version": currentVersion,
}.Infof(w, "Deployed version matches deployment version; not committing.")
return nil
}
log.Fields{
"version": currentVersion,
}.Infof(w, "Deployed version matches deployment version, but configured to deploy anyway.")
}
// fallthrough to "kubectl apply" the new configuration.
fallthrough
case errKubeResourceNotFound:
// No current deployment, create a new one.
log.Fields{
"currentVersion": currentVersion,
"deployVersion": bp.sp.version,
}.Infof(w, "Deploying new pod configuration.")
if err := kubectl.exec("apply", "-f", bp.deploymentYAMLPath).check(w); err != nil {
return errors.Annotate(err).Reason("failed to create new deployment configuration").Err()
}
return nil
default:
return errors.Annotate(err).Reason("failed to get status for deployment %(deployment)q").
D("deployment", bp.sp.deploymentName).Err()
}
}
// stagedGKEPod is staging information for a Google Container Engine deployed
// Kubernetes Pod.
type stagedGKEPod struct {
*deploy.ContainerEnginePod
// gke is the container engine deployment that owns this pod.
gke *containerEngineDeployment
// pod is the deployment pod that this is staging.
pod *layoutDeploymentGKEPod
// version is the calculated cloud project version.
version cloudProjectVersion
// The name of the deployment for thie Component.
deploymentName string
// containers is the set of staged Kubernetes containers.
containers []*stagedKubernetesContainer
// goPath is the generate GOPATH for this container's sources.
goPath []string
// imageMap maps container names to their Docker image names.
imageMap map[string]string
}
func (sp *stagedGKEPod) cloudProject() *layoutDeploymentCloudProject {
return sp.pod.comp.dep.cloudProject
}
func (sp *stagedGKEPod) stage(w *work, root *managedfs.Dir, params *deployParams) error {
// Calculate the cloud project version for this pod.
if sp.version = params.forceVersion; sp.version == nil {
var err error
sp.version, err = makeCloudProjectVersion(sp.cloudProject(), sp.pod.comp.source())
if err != nil {
return errors.Annotate(err).Reason("failed to get cloud version").Err()
}
}
comp := sp.pod.comp
sp.deploymentName = fmt.Sprintf("%s--%s", comp.comp.proj.title, comp.comp.title)
sp.imageMap = make(map[string]string, len(sp.KubePod.Container))
sp.containers = make([]*stagedKubernetesContainer, len(sp.KubePod.Container))
for i, kc := range sp.KubePod.Container {
skc := stagedKubernetesContainer{
KubernetesPod_Container: kc,
pod: sp,
image: fmt.Sprintf("gcr.io/%s/%s:%s-%s",
sp.gke.project.Name, kc.Name, sp.version.String(), sp.gke.timestampSuffix),
}
sp.imageMap[kc.Name] = skc.image
sp.containers[i] = &skc
}
// All files in this pod will share a GOPATH. Generate it, if any of our
// containers use Go.
needsGoPath := false
for _, skc := range sp.containers {
if skc.needsGoPath() {
needsGoPath = true
break
}
}
if needsGoPath {
// Build a GOPATH from our sources.
// Construct a GOPATH for this module.
goPath, err := root.EnsureDirectory("gopath")
if err != nil {
return errors.Annotate(err).Reason("failed to create GOPATH base").Err()
}
if err := stageGoPath(w, comp, goPath); err != nil {
return errors.Annotate(err).Reason("failed to stage GOPATH").Err()
}
sp.goPath = []string{goPath.String()}
}
// Stage each of our containers.
containersDir, err := root.EnsureDirectory("containers")
if err != nil {
return errors.Annotate(err).Err()
}
err = w.RunMulti(func(workC chan<- func() error) {
// Stage each component.
for _, skc := range sp.containers {
skc := skc
workC <- func() error {
containerDir, err := containersDir.EnsureDirectory(skc.Name)
if err != nil {
return errors.Annotate(err).Err()
}
if err := skc.stage(w, containerDir); err != nil {
return errors.Annotate(err).Reason("failed to stage container %(container)q").
D("container", skc.Name).Err()
}
return nil
}
}
})
if err != nil {
return err
}
if err := root.CleanUp(); err != nil {
return errors.Annotate(err).Reason("failed to cleanup staging area").Err()
}
return nil
}
func (sp *stagedGKEPod) build(w *work) error {
// Build any containers within this pod.
return w.RunMulti(func(workC chan<- func() error) {
for _, cont := range sp.containers {
workC <- func() error {
return cont.build(w)
}
}
})
}
func (sp *stagedGKEPod) push(w *work) error {
// Build any containers within this pod.
return w.RunMulti(func(workC chan<- func() error) {
for _, cont := range sp.containers {
workC <- func() error {
return cont.push(w)
}
}
})
}
// stagedKubernetesContainer is staging information for a single Kubernetes
// Container.
type stagedKubernetesContainer struct {
*deploy.KubernetesPod_Container
// pod is the pod that owns this container.
pod *stagedGKEPod
// image is the Docker image URI for this container.
image string
// remoteImageExists is true if the image already exists on the remote. This
// is checked during the "build" phase.
remoteImageExists bool
buildFn func(*work) error
}
func (skc *stagedKubernetesContainer) needsGoPath() bool {
switch skc.Type {
case deploy.KubernetesPod_Container_GO:
return true
default:
return false
}
}
func (skc *stagedKubernetesContainer) stage(w *work, root *managedfs.Dir) error {
// Build each Component.
buildDir, err := root.EnsureDirectory("build")
if err != nil {
return errors.Annotate(err).Reason("failed to create build directory").Err()
}
if err := buildComponent(w, skc.pod.pod.comp, buildDir); err != nil {
return errors.Annotate(err).Reason("failed to build component").Err()
}
switch skc.Type {
case deploy.KubernetesPod_Container_GO:
// Specify how we are to be built.
skc.buildFn = func(w *work) error {
path, err := skc.pod.pod.comp.buildPath(skc.GetBuild())
if err != nil {
return errors.Annotate(err).Err()
}
return skc.buildGo(w, path)
}
default:
return errors.Reason("unknown Kubernetes pod type %(type)T").D("type", skc.Type).Err()
}
return nil
}
func (skc *stagedKubernetesContainer) build(w *work) error {
if f := skc.buildFn; f != nil {
return f(w)
}
return nil
}
// build builds the image associated with this container.
func (skc *stagedKubernetesContainer) buildGo(w *work, entryPath string) error {
gcloud, err := w.tools.gcloud(skc.pod.cloudProject().Name)
if err != nil {
return errors.Annotate(err).Reason("could not get gcloud tool").Err()
}
// Use "aedeploy" to gather GOPATH and build against our root.
aedeploy, err := w.tools.aedeploy(skc.pod.goPath)
if err != nil {
return errors.Annotate(err).Err()
}
x := gcloud.exec("docker", "--", "build", "-t", skc.image, ".")
return aedeploy.bootstrap(x).cwd(entryPath).check(w)
}
func (skc *stagedKubernetesContainer) push(w *work) error {
gcloud, err := w.tools.gcloud(skc.pod.cloudProject().Name)
if err != nil {
return errors.Annotate(err).Reason("could not get gcloud tool").Err()
}
if err := gcloud.exec("docker", "--", "push", skc.image).check(w); err != nil {
return errors.Annotate(err).Reason("failed to push Docker image %(image)q").
D("image", skc.image).Err()
}
return nil
}
func getContainerEngineKubernetesContext(w *work, cluster *layoutDeploymentGKECluster) (
string, error) {
// Generate our Kubernetes context name. This is derived from the Google
// Container Engine cluster parameters.
kubeCtx := fmt.Sprintf("gke_%s_%s_%s", cluster.cloudProject.Name, cluster.Zone, cluster.Name)
kubectl, err := w.tools.kubectl(kubeCtx)
if err != nil {
return "", errors.Annotate(err).Err()
}
// Check if the context is already installed in our Kubernetes configuration.
switch has, err := kubectl.hasContext(w); {
case err != nil:
return "", errors.Annotate(err).Reason("failed to check for Kubernetes context").Err()
case !has:
gcloud, err := w.tools.gcloud(cluster.cloudProject.Name)
if err != nil {
return "", errors.Annotate(err).Err()
}
// The context isn't cached, we will fetch it via:
// $ gcloud container clusters get-credentials
x := gcloud.exec(
"container", "clusters",
"get-credentials", cluster.Name,
"--zone", cluster.Zone)
if err := x.check(w); err != nil {
return "", errors.Annotate(err).Reason("failed to get cluster credentials").Err()
}
switch has, err = kubectl.hasContext(w); {
case err != nil:
return "", errors.Annotate(err).Reason("failed to confirm Kubernetes context").Err()
case !has:
return "", errors.Reason("context %(context)q missing after fetching credentials").D("context", kubeCtx).Err()
}
}
return kubeCtx, nil
}