blob: be0a8481c516c872ebaf8ad60aa768fc87ef5470 [file]
// The deploy command deploys Scheduled Queries into BigQuery
package main
import (
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"regexp"
"strings"
datatransfer "cloud.google.com/go/bigquery/datatransfer/apiv1"
"google.golang.org/api/iterator"
datatransferpb "google.golang.org/genproto/googleapis/cloud/bigquery/datatransfer/v1"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"google.golang.org/protobuf/types/known/structpb"
)
func main() {
log.SetFlags(0)
ctx := context.Background()
projectID := flag.String("project_id", "", "project ID")
flag.Usage = func() {
fmt.Println("Usage: deploy [--flags] file ...")
flag.PrintDefaults()
fmt.Println(`
Files contain the queries to schedule and should have the following headers:
--name:<name> Display name of the scheduled query. If the scheduled
query exists, it will be replaced if there are any
changes.
--schedule:<schedule> Schedule for the query. Consult BigQuery documentation for
allowed formats.
Note: If the name is changed, a new schedule job will be created and the old one
will need to be deleted manually`)
}
flag.Parse()
if *projectID == "" {
fmt.Println("no project specified")
os.Exit(1)
}
if len(flag.Args()) == 0 {
fmt.Println("no files specified")
os.Exit(1)
}
queries, err := scheduledQueries(flag.Args())
if err != nil {
log.Fatal(err)
}
// Creates a client.
client, err := datatransfer.NewClient(ctx)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
configs, err := fetchTransferConfigs(ctx, client, *projectID)
if err != nil {
log.Fatalf("Failed to get scheduled queries: %v", err)
}
for _, query := range queries {
if config, ok := configs[query.name]; ok {
if config.Schedule == query.schedule && config.Params.Fields["query"].GetStringValue() == query.query {
log.Printf("%s: no changes found", query.name)
} else {
config, err := updateScheduledQuery(ctx, client, config, query)
if err != nil {
log.Fatalf("%s: error updating scheduled query [%v]", query.name, err)
}
log.Printf("%s: updated, next run at %s", query.name, config.NextRunTime.AsTime())
}
} else {
config, err := addScheduledQuery(ctx, client, *projectID, query)
if err != nil {
log.Fatalf("%s: error creating scheduled query [%v]", query.name, err)
}
log.Printf("%s: created, next run at %s", query.name, config.NextRunTime.AsTime())
}
}
}
func fetchTransferConfigs(ctx context.Context, client *datatransfer.Client, projectID string) (map[string]*datatransferpb.TransferConfig, error) {
req := &datatransferpb.ListTransferConfigsRequest{
Parent: fmt.Sprintf("projects/%s", projectID),
DataSourceIds: []string{"scheduled_query"},
}
it := client.ListTransferConfigs(ctx, req)
configs := make(map[string]*datatransferpb.TransferConfig)
for {
config, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
if _, exists := configs[config.DisplayName]; exists {
log.Fatalf("Multiple scheduled queries with display name [%v]. Please fix and run again.", config.DisplayName)
}
configs[config.DisplayName] = config
}
return configs, nil
}
func addScheduledQuery(ctx context.Context, client *datatransfer.Client, projectID string, query scheduledQuery) (*datatransferpb.TransferConfig, error) {
req := &datatransferpb.CreateTransferConfigRequest{
Parent: fmt.Sprintf("projects/%s/locations/US", projectID),
TransferConfig: &datatransferpb.TransferConfig{
DisplayName: query.name,
DataSourceId: "scheduled_query",
Schedule: query.schedule,
EmailPreferences: &datatransferpb.EmailPreferences{
EnableFailureEmail: true,
},
Params: &structpb.Struct{
Fields: map[string]*structpb.Value{
"query": structpb.NewStringValue(query.query),
},
},
},
}
return client.CreateTransferConfig(ctx, req)
}
func updateScheduledQuery(ctx context.Context, client *datatransfer.Client, config *datatransferpb.TransferConfig, query scheduledQuery) (*datatransferpb.TransferConfig, error) {
req := &datatransferpb.UpdateTransferConfigRequest{
TransferConfig: &datatransferpb.TransferConfig{
Name: config.Name,
Schedule: query.schedule,
Params: &structpb.Struct{
Fields: map[string]*structpb.Value{
"query": structpb.NewStringValue(query.query),
},
},
},
}
updateMask, err := fieldmaskpb.New(req, "transfer_config.schedule", "transfer_config.params")
if err != nil {
return nil, err
}
req.UpdateMask = updateMask
return client.UpdateTransferConfig(ctx, req)
}
type scheduledQuery struct {
name string
query string
schedule string
}
var nameRegex = regexp.MustCompile(`--name:([^\n]+)\n`)
var scheduleRegex = regexp.MustCompile(`--schedule:([^\n]+)\n`)
func scheduledQueries(files []string) ([]scheduledQuery, error) {
queries := make([]scheduledQuery, 0, len(files))
for _, file := range files {
bytes, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
query := scheduledQuery{query: string(bytes)}
name := nameRegex.FindStringSubmatch(query.query)
if name == nil || len(name) < 2 {
return nil, fmt.Errorf("%s has no name specified", file)
}
query.name = strings.Trim(name[1], " ")
schedule := scheduleRegex.FindStringSubmatch(query.query)
if schedule == nil || len(schedule) < 2 {
return nil, fmt.Errorf("%s has no schedule specified", file)
}
query.schedule = strings.Trim(schedule[1], " ")
queries = append(queries, query)
}
return queries, nil
}