blob: 2bc57495b1822bd217a73c9227695e1d21a246a9 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Binary mailer implements the mailer server.
package main
import (
"context"
"encoding/hex"
"flag"
"fmt"
"net/smtp"
"net/textproto"
"time"
"github.com/jordan-wright/email"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/auth/identity"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/server"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/auth/openid"
"go.chromium.org/luci/server/caching"
"go.chromium.org/luci/server/limiter"
"go.chromium.org/luci/server/module"
"go.chromium.org/luci/server/redisconn"
"go.chromium.org/luci/mailer/api/mailer"
)
// Note: to run this code locally you'll need an SMTP server. A simple solution
// is to run a test SMTP server (like https://github.com/mailhog/MailHog) via
// Docker:
//
// $ docker run -p 127.0.0.1:1025:1025 -p 127.0.0.1:8025:8025 mailhog/mailhog
// $ ./mailer -smtp-port 1025
// <open http://127.0.0.1:8025 in the browser to see MailHog's UI>
func main() {
smtpPort := flag.Int("smtp-port", 1025,
`A port of a localhost SMTP server to use.`)
smtpPoolSize := flag.Int("smtp-pool-size", 100,
`The maximum number of parallel connections that can be made to the SMTP server.`)
callersGroup := flag.String("mailer-callers-group", "auth-mailer-access",
`A LUCI group with callers authorized to use this service.`)
modules := []module.Module{
limiter.NewModuleFromFlags(),
redisconn.NewModuleFromFlags(),
}
server.Main(nil, modules, func(srv *server.Server) error {
smtpAddr := fmt.Sprintf("127.0.0.1:%d", *smtpPort)
// Wait a bit until the SMTP server is up. This is important when it is
// launched by Kubernetes in parallel to launching `mailer` server itself.
if err := waitSMTP(srv.Context, smtpAddr); err != nil {
return errors.Annotate(err, "failed to connect to the SMTP server").Err()
}
// Use a connection pool to avoid opening/closing connections all the time.
pool, err := email.NewPool(smtpAddr, *smtpPoolSize, nil)
if err != nil {
return errors.Annotate(err, "failed to create SMTP connection pool").Err()
}
srv.RegisterCleanup(func(context.Context) { pool.Close() })
srv.PRPC.Authenticator = &auth.Authenticator{
Methods: []auth.Method{
// The primary authentication method.
&openid.GoogleIDTokenAuthMethod{
AudienceCheck: openid.AudienceMatchesHost,
SkipNonJWT: true, // pass OAuth2 access tokens through
},
// Backward compatibility for RPC Explorer and old clients.
&auth.GoogleOAuth2Method{
Scopes: []string{"https://www.googleapis.com/auth/userinfo.email"},
},
},
}
mailer.RegisterMailerServer(srv.PRPC, &mailerServer{
callersGroup: *callersGroup,
pool: pool,
cache: caching.GlobalCache(srv.Context, "mailer"),
})
return nil
})
}
// waitSMTP tries to ping an SMTP server in a loop.
func waitSMTP(ctx context.Context, addr string) error {
attempt := 0
for {
attempt++
switch err := pingSMTP(addr); {
case err == nil:
logging.Infof(ctx, "The SMTP server %q is ready", addr)
return nil
case attempt > 10:
return err
default:
logging.Warningf(ctx, "Waiting for the SMTP server: %s", err)
if res := clock.Sleep(ctx, 2*time.Second); res.Err != nil {
return res.Err // the server is shutting down already
}
}
}
}
// pingSMTP returns nil if it manages to connect to the SMTP server.
func pingSMTP(addr string) error {
client, err := smtp.Dial(addr)
if err != nil {
return err
}
defer client.Close()
return client.Noop()
}
type mailerServer struct {
mailer.UnimplementedMailerServer
callersGroup string // a LUCI group with authorized callers
pool *email.Pool // an SMTP connection pool
cache caching.BlobCache // to store requestID => messageID mapping
// Used in tests to mock pool.Send.
send func(msg *email.Email, timeout time.Duration) error
}
// SendMail implements the corresponding RPC method.
func (s *mailerServer) SendMail(ctx context.Context, req *mailer.SendMailRequest) (*mailer.SendMailResponse, error) {
caller := auth.CurrentIdentity(ctx)
switch yes, err := auth.IsMember(ctx, s.callersGroup); {
case err != nil:
logging.Errorf(ctx, "IsMember call failed when checking %q: %s", caller, err)
return nil, status.Errorf(codes.Internal, "failed to authorize the request")
case !yes:
logging.Errorf(ctx, "Unauthorized caller: %q", caller)
return nil, status.Errorf(codes.PermissionDenied, "caller %q is unauthorized", caller)
}
logging.Infof(ctx, "Caller: %q", caller)
logging.Infof(ctx, "Request ID: %q", req.RequestId)
requestDedupKey := ""
if req.RequestId != "" {
requestDedupKey = requestDedupCacheKey(caller, req.RequestId)
}
// If the request has RequestID, check if we already handled this request.
if requestDedupKey != "" {
switch msgID, err := s.deduplicateRequest(ctx, requestDedupKey); {
case err != nil:
logging.Errorf(ctx, "Failed to check for duplicate requests: %s", err)
return nil, status.Errorf(codes.Internal, "failed to check for duplicate requests: %s", err)
case msgID != "":
logging.Infof(ctx, "Deduplicated the request, message ID is %q", msgID)
return &mailer.SendMailResponse{MessageId: msgID}, nil
}
}
msgID, err := s.enqueueMail(ctx, req)
if err != nil {
logging.Errorf(ctx, "Failed to enqueue the mail: %s", err)
return nil, err
}
logging.Infof(ctx, "Message ID: %q", msgID)
// Best-effort at remembering the RequestID => MessageID mapping for
// deduplication. If this fails, returning an error here will be more harmful
// than just ignoring it: the client will retry and a thus send a duplicate
// email.
if requestDedupKey != "" {
if err := s.rememberRequestID(ctx, requestDedupKey, msgID); err != nil {
logging.Errorf(ctx,
"Failed to remember RequestID => MessageID association: %q => %q: %s",
requestDedupKey, msgID, err,
)
}
}
return &mailer.SendMailResponse{MessageId: msgID}, nil
}
// requestDedupCacheKey returns a cache key of a entry used to dedup requests.
func requestDedupCacheKey(caller identity.Identity, requestID string) string {
return fmt.Sprintf("reqID:v1:%s:%s", caller, requestID)
}
// deduplicateRequest checks if this request has already been performed.
//
// Returns the resulting message ID if it was or "" otherwise.
func (s *mailerServer) deduplicateRequest(ctx context.Context, cacheKey string) (string, error) {
if s.cache == nil {
logging.Warningf(ctx, "The cache is not configured, skipping deduplication check for %q", cacheKey)
return "", nil
}
switch val, err := s.cache.Get(ctx, cacheKey); {
case err == caching.ErrCacheMiss:
return "", nil
case err != nil:
return "", err
default:
return string(val), nil
}
}
// rememberRequestID stores RequestID => MessageID association.
func (s *mailerServer) rememberRequestID(ctx context.Context, cacheKey, messageID string) error {
if s.cache == nil {
return nil
}
return s.cache.Set(ctx, cacheKey, []byte(messageID), time.Hour)
}
// enqueueMail sends the mail to the SMTP server.
//
// Assumes all authorization checks have been made already. Returns the
// resulting message ID. Returns gRPC errors.
func (s *mailerServer) enqueueMail(ctx context.Context, req *mailer.SendMailRequest) (string, error) {
timeout := 10 * time.Second
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(clock.Now(ctx))
if timeout <= 0 {
return "", status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
}
msg := email.NewEmail()
msg.From = req.Sender
msg.ReplyTo = req.ReplyTo
msg.To = req.To
msg.Cc = req.Cc
msg.Bcc = req.Bcc
msg.Subject = req.Subject
msg.Text = []byte(req.TextBody)
msg.HTML = []byte(req.HtmlBody)
// Generate a message ID and convert it into an RFC 2822 compliant form.
//
// Note that the SMTP server assigns its own ID to the message as well when
// acknowledging it with "250 Ok" response, but unfortunately `net/smtp`
// package (and consequently all Go packages based on it), doesn't expose this
// ID in its API (it totally ignores the text status attached to "250 Ok"
// response).
msgID := generateMessageID(ctx)
msg.Headers.Set("Message-Id", fmt.Sprintf("<%s@luci.api.cr.dev>", msgID))
send := s.send
if send == nil {
send = s.pool.Send
}
switch err := send(msg, timeout); {
case err == nil:
return msgID, nil
case err == email.ErrTimeout:
return "", status.Error(codes.DeadlineExceeded, "timeout when waiting for an SMTP connection")
case err == email.ErrClosed:
return "", status.Error(codes.Internal, "the mailer server is shutting down")
case isFatalSMTP(err):
return "", status.Errorf(codes.InvalidArgument, "failed to send the email: %s", err)
default:
return "", status.Errorf(codes.Internal, "transient SMTP error: %s", err)
}
}
// generateMessageID produces a new unique message identifier.
func generateMessageID(ctx context.Context) string {
var blob [20]byte
if _, err := mathrand.Read(ctx, blob[:]); err != nil {
panic(err)
}
return hex.EncodeToString(blob[:])
}
// isFatalSMTP recognizes fatal SMTP errors.
//
// We assume only SMTP replies with "Permanent Negative Completion reply"
// status codes are fatal and everything else (like network errors, transient
// SMTP replies, etc.) are transient.
//
// See https://datatracker.ietf.org/doc/html/rfc5321#section-4.2.1 which
// defines 5xx as "Permanent Negative Completion reply".
func isFatalSMTP(err error) bool {
if tpe, ok := err.(*textproto.Error); ok {
return tpe.Code >= 500
}
return false
}