// Binary mailer implements the mailer server.
package main
import (
// Note: to run this code locally you'll need an SMTP server. A simple solution
// is to run a test SMTP server (like via
// Docker:
// $ docker run -p -p mailhog/mailhog
// $ ./mailer -smtp-port 1025
// <open in the browser to see MailHog's UI>
func main() {
smtpPort := flag.Int("smtp-port", 1025,
`A port of a localhost SMTP server to use.`)
smtpPoolSize := flag.Int("smtp-pool-size", 100,
`The maximum number of parallel connections that can be made to the SMTP server.`)
callersGroup := flag.String("mailer-callers-group", "auth-mailer-access",
`A LUCI group with callers authorized to use this service.`)
modules := []module.Module{
server.Main(nil, modules, func(srv *server.Server) error {
smtpAddr := fmt.Sprintf("", *smtpPort)
// Wait a bit until the SMTP server is up. This is important when it is
// launched by Kubernetes in parallel to launching `mailer` server itself.
if err := waitSMTP(srv.Context, smtpAddr); err != nil {
return errors.Annotate(err, "failed to connect to the SMTP server").Err()
// Use a connection pool to avoid opening/closing connections all the time.
pool, err := email.NewPool(smtpAddr, *smtpPoolSize, nil)
if err != nil {
return errors.Annotate(err, "failed to create SMTP connection pool").Err()
srv.RegisterCleanup(func(context.Context) { pool.Close() })
srv.PRPC.Authenticator = &auth.Authenticator{
Methods: []auth.Method{
// The primary authentication method.
AudienceCheck: openid.AudienceMatchesHost,
SkipNonJWT: true, // pass OAuth2 access tokens through
// Backward compatibility for RPC Explorer and old clients.
Scopes: []string{""},
mailer.RegisterMailerServer(srv.PRPC, &mailerServer{
callersGroup: *callersGroup,
pool: pool,
cache: caching.GlobalCache(srv.Context, "mailer"),
return nil
// waitSMTP tries to ping an SMTP server in a loop.
func waitSMTP(ctx context.Context, addr string) error {
attempt := 0
for {
switch err := pingSMTP(addr); {
case err == nil:
logging.Infof(ctx, "The SMTP server %q is ready", addr)
return nil
case attempt > 10:
return err
logging.Warningf(ctx, "Waiting for the SMTP server: %s", err)
if res := clock.Sleep(ctx, 2*time.Second); res.Err != nil {
return res.Err // the server is shutting down already
// pingSMTP returns nil if it manages to connect to the SMTP server.
func pingSMTP(addr string) error {
client, err := smtp.Dial(addr)
if err != nil {
return err
defer client.Close()
return client.Noop()
type mailerServer struct {
callersGroup string // a LUCI group with authorized callers
pool *email.Pool // an SMTP connection pool
cache caching.BlobCache // to store requestID => messageID mapping
// Used in tests to mock pool.Send.
send func(msg *email.Email, timeout time.Duration) error
// SendMail implements the corresponding RPC method.
func (s *mailerServer) SendMail(ctx context.Context, req *mailer.SendMailRequest) (*mailer.SendMailResponse, error) {
caller := auth.CurrentIdentity(ctx)
switch yes, err := auth.IsMember(ctx, s.callersGroup); {
case err != nil:
logging.Errorf(ctx, "IsMember call failed when checking %q: %s", caller, err)
return nil, status.Errorf(codes.Internal, "failed to authorize the request")
case !yes:
logging.Errorf(ctx, "Unauthorized caller: %q", caller)
return nil, status.Errorf(codes.PermissionDenied, "caller %q is unauthorized", caller)
logging.Infof(ctx, "Caller: %q", caller)
logging.Infof(ctx, "Request ID: %q", req.RequestId)
requestDedupKey := ""
if req.RequestId != "" {
requestDedupKey = requestDedupCacheKey(caller, req.RequestId)
// If the request has RequestID, check if we already handled this request.
if requestDedupKey != "" {
switch msgID, err := s.deduplicateRequest(ctx, requestDedupKey); {
case err != nil:
logging.Errorf(ctx, "Failed to check for duplicate requests: %s", err)
return nil, status.Errorf(codes.Internal, "failed to check for duplicate requests: %s", err)
case msgID != "":
logging.Infof(ctx, "Deduplicated the request, message ID is %q", msgID)
return &mailer.SendMailResponse{MessageId: msgID}, nil
msgID, err := s.enqueueMail(ctx, req)
if err != nil {
logging.Errorf(ctx, "Failed to enqueue the mail: %s", err)
return nil, err
logging.Infof(ctx, "Message ID: %q", msgID)
// Best-effort at remembering the RequestID => MessageID mapping for
// deduplication. If this fails, returning an error here will be more harmful
// than just ignoring it: the client will retry and a thus send a duplicate
// email.
if requestDedupKey != "" {
if err := s.rememberRequestID(ctx, requestDedupKey, msgID); err != nil {
"Failed to remember RequestID => MessageID association: %q => %q: %s",
requestDedupKey, msgID, err,
return &mailer.SendMailResponse{MessageId: msgID}, nil
// requestDedupCacheKey returns a cache key of a entry used to dedup requests.
func requestDedupCacheKey(caller identity.Identity, requestID string) string {
return fmt.Sprintf("reqID:v1:%s:%s", caller, requestID)
// deduplicateRequest checks if this request has already been performed.
// Returns the resulting message ID if it was or "" otherwise.
func (s *mailerServer) deduplicateRequest(ctx context.Context, cacheKey string) (string, error) {
if s.cache == nil {
logging.Warningf(ctx, "The cache is not configured, skipping deduplication check for %q", cacheKey)
return "", nil
switch val, err := s.cache.Get(ctx, cacheKey); {
case err == caching.ErrCacheMiss:
return "", nil
case err != nil:
return "", err
return string(val), nil
// rememberRequestID stores RequestID => MessageID association.
func (s *mailerServer) rememberRequestID(ctx context.Context, cacheKey, messageID string) error {
if s.cache == nil {
return nil
return s.cache.Set(ctx, cacheKey, []byte(messageID), time.Hour)
// enqueueMail sends the mail to the SMTP server.
// Assumes all authorization checks have been made already. Returns the
// resulting message ID. Returns gRPC errors.
func (s *mailerServer) enqueueMail(ctx context.Context, req *mailer.SendMailRequest) (string, error) {
timeout := 10 * time.Second
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(clock.Now(ctx))
if timeout <= 0 {
return "", status.Error(codes.DeadlineExceeded, "deadline exceeded")
msg := email.NewEmail()
msg.From = req.Sender
msg.ReplyTo = req.ReplyTo
msg.To = req.To
msg.Cc = req.Cc
msg.Bcc = req.Bcc
msg.Subject = req.Subject
msg.Text = []byte(req.TextBody)
msg.HTML = []byte(req.HtmlBody)
// Generate a message ID and convert it into an RFC 2822 compliant form.
// Note that the SMTP server assigns its own ID to the message as well when
// acknowledging it with "250 Ok" response, but unfortunately `net/smtp`
// package (and consequently all Go packages based on it), doesn't expose this
// ID in its API (it totally ignores the text status attached to "250 Ok"
// response).
msgID := generateMessageID(ctx)
msg.Headers.Set("Message-Id", fmt.Sprintf("<>", msgID))
send := s.send
if send == nil {
send = s.pool.Send
switch err := send(msg, timeout); {
case err == nil:
return msgID, nil
case err == email.ErrTimeout:
return "", status.Error(codes.DeadlineExceeded, "timeout when waiting for an SMTP connection")
case err == email.ErrClosed:
return "", status.Error(codes.Internal, "the mailer server is shutting down")
case isFatalSMTP(err):
return "", status.Errorf(codes.InvalidArgument, "failed to send the email: %s", err)
return "", status.Errorf(codes.Internal, "transient SMTP error: %s", err)
// generateMessageID produces a new unique message identifier.
func generateMessageID(ctx context.Context) string {
var blob [20]byte
if _, err := mathrand.Read(ctx, blob[:]); err != nil {
return hex.EncodeToString(blob[:])
// isFatalSMTP recognizes fatal SMTP errors.
// We assume only SMTP replies with "Permanent Negative Completion reply"
// status codes are fatal and everything else (like network errors, transient
// SMTP replies, etc.) are transient.
// See which
// defines 5xx as "Permanent Negative Completion reply".
func isFatalSMTP(err error) bool {
if tpe, ok := err.(*textproto.Error); ok {
return tpe.Code >= 500
return false