Replace "default" queue with "current" queue if queue setting is not provided.

Revision created by MOE tool push_codebase.
MOE_MIGRATION=6937


git-svn-id: https://appengine-pipeline.googlecode.com/svn/trunk@153 1742be92-6d41-bc97-00b7-290b722d530a
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
index 4e3b748..3e6ee15 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
@@ -545,6 +545,18 @@
     return thisJobRecord.getStatusConsoleUrl();
   }
 
+  protected String getOnQueue() {
+    return thisJobRecord.getQueueSettings().getOnQueue();
+  }
+
+  protected String getOnBackend() {
+    return thisJobRecord.getQueueSettings().getOnBackend();
+  }
+
+  protected String getOnModule() {
+    return thisJobRecord.getQueueSettings().getOnModule();
+  }
+
   /**
    * Returns the job's display name. Used for presentation purpose only.
    */
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java b/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java
index c7b19ab..533c76e 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/JobSetting.java
@@ -175,8 +175,8 @@
     private static final long serialVersionUID = -5010485721032395432L;
     public static final String DEFAULT = null;
 
-    public OnQueue(String backend) {
-      super(backend);
+    public OnQueue(String queue) {
+      super(queue);
     }
   }
 
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
index 7e958cc..9778de2 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
@@ -40,9 +40,9 @@
 import com.google.appengine.tools.pipeline.impl.model.SlotDescriptor;
 import com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet;
 import com.google.appengine.tools.pipeline.impl.tasks.CancelJobTask;
+import com.google.appengine.tools.pipeline.impl.tasks.DelayedSlotFillTask;
 import com.google.appengine.tools.pipeline.impl.tasks.DeletePipelineTask;
 import com.google.appengine.tools.pipeline.impl.tasks.FanoutTask;
-import com.google.appengine.tools.pipeline.impl.tasks.FillSlotHandleSlotFilledTask;
 import com.google.appengine.tools.pipeline.impl.tasks.FinalizeJobTask;
 import com.google.appengine.tools.pipeline.impl.tasks.HandleChildExceptionTask;
 import com.google.appengine.tools.pipeline.impl.tasks.HandleSlotFilledTask;
@@ -520,24 +520,19 @@
     try {
       switch (task.getType()) {
         case RUN_JOB:
-          RunJobTask runJobTask = (RunJobTask) task;
-          runJob(runJobTask.getJobKey());
+          runJob((RunJobTask) task);
           break;
         case HANDLE_SLOT_FILLED:
-          HandleSlotFilledTask hsfTask = (HandleSlotFilledTask) task;
-          handleSlotFilled(hsfTask.getSlotKey());
+          handleSlotFilled((HandleSlotFilledTask) task);
           break;
         case FINALIZE_JOB:
-          FinalizeJobTask finalizeJobTask = (FinalizeJobTask) task;
-          finalizeJob(finalizeJobTask.getJobKey());
+          finalizeJob((FinalizeJobTask) task);
           break;
         case FAN_OUT:
-          FanoutTask fanoutTask = (FanoutTask) task;
-          handleFanoutTaskOrAbandonTask(fanoutTask);
+          handleFanoutTaskOrAbandonTask((FanoutTask) task);
           break;
         case CANCEL_JOB:
-          CancelJobTask cancelJobTask = (CancelJobTask) task;
-          cancelJob(cancelJobTask.getJobKey());
+          cancelJob((CancelJobTask) task);
           break;
         case DELETE_PIPELINE:
           DeletePipelineTask deletePipelineTask = (DeletePipelineTask) task;
@@ -549,14 +544,10 @@
           }
           break;
         case HANDLE_CHILD_EXCEPTION:
-          HandleChildExceptionTask handleChildExceptionTask = (HandleChildExceptionTask) task;
-          handleChildException(
-              handleChildExceptionTask.getKey(), handleChildExceptionTask.getFailedChildKey());
+          handleChildException((HandleChildExceptionTask) task);
           break;
-        case FILL_SLOT_HANDLE_SLOT_FILLED:
-          FillSlotHandleSlotFilledTask fillSlotHandleSlotFilledTask =
-              (FillSlotHandleSlotFilledTask) task;
-          handleFillSlotHandleFilled(fillSlotHandleSlotFilledTask);
+        case DELAYED_SLOT_FILL:
+          handleDelayedSlotFill((DelayedSlotFillTask) task);
           break;
         default:
           throw new IllegalArgumentException("Unrecognized task type: " + task.getType());
@@ -672,12 +663,11 @@
    * a {@link HandleSlotFilledTask} for any slots that are filled immediately.
    *
    * @see "http://goto/java-pipeline-model"
-   *
-   * @param jobKey
    */
-  private static void runJob(Key jobKey) {
-    JobRecord jobRecord = null;
-    jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
+  private static void runJob(RunJobTask task) {
+    Key jobKey = task.getJobKey();
+    JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
+    jobRecord.getQueueSettings().merge(task.getQueueSettings());
     Key rootJobKey = jobRecord.getRootJobKey();
     logger.info("Running pipeline job " + jobKey.getName() + "; UI at "
         + PipelineServlet.makeViewerUrl(rootJobKey, jobKey));
@@ -814,9 +804,10 @@
         updateSpec, jobRecord.getQueueSettings(), jobKey, State.WAITING_TO_RUN, State.RETRY);
   }
 
-  private static void cancelJob(Key jobKey) {
-    JobRecord jobRecord = null;
-    jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
+  private static void cancelJob(CancelJobTask cancelJobTask) {
+    Key jobKey = cancelJobTask.getJobKey();
+    JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
+    jobRecord.getQueueSettings().merge(cancelJobTask.getQueueSettings());
     Key rootJobKey = jobRecord.getRootJobKey();
     logger.info("Cancelling pipeline job " + jobKey.getName());
     JobRecord rootJobRecord = jobRecord;
@@ -955,8 +946,11 @@
     }
   }
 
-  private static void handleChildException(Key jobKey, Key failedChildKey) {
+  private static void handleChildException(HandleChildExceptionTask handleChildExceptionTask) {
+    Key jobKey = handleChildExceptionTask.getKey();
+    Key failedChildKey = handleChildExceptionTask.getFailedChildKey();
     JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
+    jobRecord.getQueueSettings().merge(handleChildExceptionTask.getQueueSettings());
     Key rootJobKey = jobRecord.getRootJobKey();
     logger.info("Running pipeline job " + jobKey.getName() + "; UI at "
         + PipelineServlet.makeViewerUrl(rootJobKey, jobKey));
@@ -990,14 +984,13 @@
    * output slot.
    *
    * @see "http://goto/java-pipeline-model"
-   *
-   * @param jobKey
    */
-  private static void finalizeJob(Key jobKey) {
+  private static void finalizeJob(FinalizeJobTask finalizeJobTask) {
+    Key jobKey = finalizeJobTask.getJobKey();
     // Get the JobRecord, its finalize Barrier, all the slots in the
     // finalize Barrier, and the job's output Slot.
     JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_FINALIZE);
-
+    jobRecord.getQueueSettings().merge(finalizeJobTask.getQueueSettings());
     switch (jobRecord.getState()) {
       case WAITING_TO_FINALIZE:
         // OK, proceed
@@ -1101,12 +1094,10 @@
    * barrier is waiting on are now filled then the barrier should be released.
    * Release the barrier by enqueueing an appropriate task (either
    * {@link RunJobTask} or {@link FinalizeJobTask}.
-   *
-   * @param slotKey The key of the slot that has been filled.
    */
-  private static void handleSlotFilled(Key slotKey) {
-    Slot slot = null;
-    slot = querySlotOrAbandonTask(slotKey, true);
+  private static void handleSlotFilled(HandleSlotFilledTask hsfTask) {
+    Key slotKey = hsfTask.getSlotKey();
+    Slot slot = querySlotOrAbandonTask(slotKey, true);
     List<Barrier> waitingList = slot.getWaitingOnMeInflated();
     if (null == waitingList) {
       throw new RuntimeException("Internal logic error: " + slot + " is not inflated");
@@ -1133,6 +1124,7 @@
         if (shouldBeReleased) {
           Key jobKey = barrier.getJobKey();
           JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.NONE);
+          jobRecord.getQueueSettings().merge(hsfTask.getQueueSettings());
           Task task;
           switch (barrier.getType()) {
             case RUN:
@@ -1157,17 +1149,16 @@
   /**
    * Fills the slot with null value and calls handleSlotFilled
    */
-  private static void handleFillSlotHandleFilled(
-      FillSlotHandleSlotFilledTask fillSlotHandleFilledTask) {
-    Key slotKey = fillSlotHandleFilledTask.getSlotKey();
+  private static void handleDelayedSlotFill(DelayedSlotFillTask task) {
+    Key slotKey = task.getSlotKey();
     Slot slot = querySlotOrAbandonTask(slotKey, true);
-    Key rootJobKey = fillSlotHandleFilledTask.getRootJobKey();
+    Key rootJobKey = task.getRootJobKey();
     UpdateSpec updateSpec = new UpdateSpec(rootJobKey);
     slot.fill(null);
     updateSpec.getNonTransactionalGroup().includeSlot(slot);
-    backEnd.save(updateSpec, fillSlotHandleFilledTask.getQueueSettings());
+    backEnd.save(updateSpec, task.getQueueSettings());
     // re-reading Slot (in handleSlotFilled) is needed (to capture slot fill after this one)
-    handleSlotFilled(slotKey);
+    handleSlotFilled(new HandleSlotFilledTask(slotKey, task.getQueueSettings()));
   }
 
   /**
@@ -1239,9 +1230,9 @@
    */
   public static void registerDelayedValue(
       UpdateSpec spec, JobRecord generatorJobRecord, long delaySec, Slot slot) {
-    FillSlotHandleSlotFilledTask task = new FillSlotHandleSlotFilledTask(
-        slot.getKey(), generatorJobRecord.getRootJobKey(), generatorJobRecord.getQueueSettings());
-    task.getQueueSettings().setDelayInSeconds(delaySec);
+    Key rootKey = generatorJobRecord.getRootJobKey();
+    QueueSettings queueSettings = generatorJobRecord.getQueueSettings();
+    DelayedSlotFillTask task = new DelayedSlotFillTask(slot, delaySec, rootKey , queueSettings);
     spec.getFinalTransaction().registerTask(task);
   }
 }
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java
index 8d86282..78c8951 100644
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/QueueSettings.java
@@ -15,6 +15,7 @@
 
   /**
    * Merge will override any {@code null} setting with a matching setting from {@code other}.
+   * Note, delay value is not being merged.
    */
   public QueueSettings merge(QueueSettings other) {
     if (onBackend == null && onModule == null) {
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
index 8d55d74..bfcbedb 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
@@ -26,6 +26,7 @@
 import com.google.appengine.tools.pipeline.impl.QueueSettings;
 import com.google.appengine.tools.pipeline.impl.servlets.TaskHandler;
 import com.google.appengine.tools.pipeline.impl.tasks.Task;
+import com.google.apphosting.api.ApiProxy;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -62,6 +63,10 @@
   }
 
   private static Queue getQueue(String queueName) {
+    if (queueName == null) {
+      Map<String, Object> attributes = ApiProxy.getCurrentEnvironment().getAttributes();
+      queueName = (String) attributes.get(TaskHandler.TASK_QUEUE_NAME_HEADER);
+    }
     return queueName == null ? QueueFactory.getDefaultQueue() : QueueFactory.getQueue(queueName);
   }
 
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
index 9e00bec..1e6e8be 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
@@ -17,8 +17,10 @@
 import com.google.appengine.tools.pipeline.impl.PipelineManager;
 import com.google.appengine.tools.pipeline.impl.tasks.Task;
 import com.google.appengine.tools.pipeline.impl.util.StringUtils;
+import com.google.apphosting.api.ApiProxy;
 
 import java.util.Enumeration;
+import java.util.Map;
 import java.util.Properties;
 import java.util.logging.Logger;
 
@@ -67,9 +69,15 @@
     }
     String taskName = request.getHeader(TASK_NAME_REQUEST_HEADER);
     Task task = Task.fromProperties(taskName, properties);
+    task.getQueueSettings().setDelayInSeconds(null);
     String queueName = request.getHeader(TASK_QUEUE_NAME_HEADER);
-    if (queueName != null && task.getQueueSettings().getOnQueue() == null) {
-      task.getQueueSettings().setOnQueue(queueName);
+    if (queueName != null && !queueName.isEmpty()) {
+      String onQueue = task.getQueueSettings().getOnQueue();
+       if (onQueue == null || onQueue.isEmpty()) {
+         task.getQueueSettings().setOnQueue(queueName);
+       }
+       Map<String, Object> attributes = ApiProxy.getCurrentEnvironment().getAttributes();
+       attributes.put(TASK_QUEUE_NAME_HEADER, queueName);
     }
     return task;
   }
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/FillSlotHandleSlotFilledTask.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/DelayedSlotFillTask.java
similarity index 76%
rename from java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/FillSlotHandleSlotFilledTask.java
rename to java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/DelayedSlotFillTask.java
index 05b69a3..a712b01 100644
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/FillSlotHandleSlotFilledTask.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/DelayedSlotFillTask.java
@@ -17,6 +17,7 @@
 import com.google.appengine.api.datastore.Key;
 import com.google.appengine.api.datastore.KeyFactory;
 import com.google.appengine.tools.pipeline.impl.QueueSettings;
+import com.google.appengine.tools.pipeline.impl.model.Slot;
 
 import java.util.Properties;
 
@@ -26,7 +27,7 @@
  *
  * @author maximf@google.com (Maxim Fateev)
  */
-public class FillSlotHandleSlotFilledTask extends ObjRefTask {
+public class DelayedSlotFillTask extends ObjRefTask {
 
   private static final String ROOT_JOB_KEY_PARAM = "rootJobKey";
 
@@ -37,15 +38,18 @@
    * construct a {@code HandleSlotFilledTask}to be enqueued.
    * <p>
    *
-   * @param slotKey The key of the Slot whose filling is to be handled
+   * @param slot The Slot whose filling is to be handled
+   * @param delay The delay in seconds before task gets executed
    * @param rootJobKey The key of the root job of the pipeline
+   * @param queueSettings The queue settings
    */
-  public FillSlotHandleSlotFilledTask(Key slotKey, Key rootJobKey, QueueSettings queueSettings) {
-    super(Type.FILL_SLOT_HANDLE_SLOT_FILLED, "fillSlotHandleSlotFilled", slotKey, queueSettings);
+  public DelayedSlotFillTask(Slot slot, long delay, Key rootJobKey, QueueSettings queueSettings) {
+    super(Type.DELAYED_SLOT_FILL, "delayedSlotFillTask", slot.getKey(), queueSettings);
+    getQueueSettings().setDelayInSeconds(delay);
     this.rootJobKey = rootJobKey;
   }
 
-  protected FillSlotHandleSlotFilledTask(Type type, String taskName, Properties properties) {
+  protected DelayedSlotFillTask(Type type, String taskName, Properties properties) {
     super(type, taskName, properties);
     rootJobKey = KeyFactory.stringToKey(properties.getProperty(ROOT_JOB_KEY_PARAM));
   }
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/Task.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/Task.java
index c7b191b..3391fde 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/Task.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/tasks/Task.java
@@ -139,7 +139,7 @@
     FINALIZE_JOB(FinalizeJobTask.class),
     FAN_OUT(FanoutTask.class),
     DELETE_PIPELINE(DeletePipelineTask.class),
-    FILL_SLOT_HANDLE_SLOT_FILLED(FillSlotHandleSlotFilledTask.class);
+    DELAYED_SLOT_FILL(DelayedSlotFillTask.class);
 
     private final Constructor<? extends Task> taskConstructor;