blob: 2426999f3708c4c84dedfda9087564d87955a560 [file] [log] [blame]
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
def enqueue_async(
queue_name, task_kwargs, transactional=True
): # pragma: no cover
"""Enqueues tasks. Mocked in tests.
kwargs['payload'] and kwargs['retry_options'] may be dicts.
Payload would be serialized to JSON.
tasks = []
for kwargs in task_kwargs:
kwargs = kwargs.copy()
if isinstance(kwargs.get('payload'), dict):
kwargs['payload'] = json.dumps(kwargs.get('payload'), sort_keys=True)
if isinstance(kwargs.get('retry_options'), dict):
kwargs['retry_options'] = taskqueue.TaskRetryOptions(
q = taskqueue.Queue(queue_name)
# Cannot just return add_async's return value because it is
# a non-Future object and does not play nice with `yield fut1, fut2` construct
yield q.add_async(tasks, transactional=transactional)