Merge pull request #38 from Khan/fix_zipimport

Add zipimport compatability to ResourceHandler 
diff --git a/.gitignore b/.gitignore
index 5ca0973..9d6e5df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
 .DS_Store
-
+.idea
+*.iml
diff --git a/java/example/war/WEB-INF/appengine-web.xml b/java/example/war/WEB-INF/appengine-web.xml
index facc604..18615d6 100644
--- a/java/example/war/WEB-INF/appengine-web.xml
+++ b/java/example/war/WEB-INF/appengine-web.xml
@@ -6,6 +6,7 @@
 
   <!-- Configure java.util.logging -->
   <system-properties>
+    <property name="com.google.appengine.tools.pipeline.BASE_URL" value="/pipeline/"/>
     <property name="java.util.logging.config.file" value="WEB-INF/logging.properties"/>
   </system-properties>
 
diff --git a/java/example/war/WEB-INF/functions.tld b/java/example/war/WEB-INF/functions.tld
new file mode 100644
index 0000000..e6876ea
--- /dev/null
+++ b/java/example/war/WEB-INF/functions.tld
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<taglib 
+    xmlns="http://java.sun.com/xml/ns/javaee"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-jsptaglibrary_2_1.xsd"
+    version="2.1">
+
+    <display-name>Custom Functions</display-name>    
+    <tlib-version>1.0</tlib-version>
+    <uri>http://github.com/GoogleCloudPlatform/appengine-pipelines/functions</uri>
+
+    <function>
+        <name>baseUrl</name>
+        <function-class>com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet</function-class>
+        <function-signature>java.lang.String baseUrl()</function-signature>
+    </function>
+</taglib>
diff --git a/java/example/war/WEB-INF/web.xml b/java/example/war/WEB-INF/web.xml
index 3ca30e2..1b16b0e 100644
--- a/java/example/war/WEB-INF/web.xml
+++ b/java/example/war/WEB-INF/web.xml
@@ -10,7 +10,7 @@
   </servlet>
   <servlet-mapping>
     <servlet-name>PipelineServlet</servlet-name>
-    <url-pattern>/_ah/pipeline/*</url-pattern>
+    <url-pattern>/pipeline/*</url-pattern>
   </servlet-mapping>
   <welcome-file-list>
     <welcome-file>index.html</welcome-file>
diff --git a/java/example/war/gcd.jsp b/java/example/war/gcd.jsp
index e564f42..bf06cff 100644
--- a/java/example/war/gcd.jsp
+++ b/java/example/war/gcd.jsp
@@ -1,6 +1,7 @@
 <%@ page import="com.google.appengine.tools.pipeline.*" %>
 <%@ page import="com.google.appengine.tools.pipeline.demo.*" %>
 <%@ page import="com.google.appengine.tools.pipeline.demo.GCDExample.GCDJob" %>
+<%@taglib uri="http://github.com/GoogleCloudPlatform/appengine-pipelines/functions" prefix="f" %>
 
 <%!
     private static final String X_PARAM_NAME = "x";
@@ -133,12 +134,11 @@
     if (null != pipelineId) {
 %>
 <p>
-    <a href="/_ah/pipeline/status.html?root=<%=pipelineId%>" target="Pipeline Status">view status
-        page</a>
-        <%
+  <a href="${f:baseUrl()}status.html?root=<%=pipelineId%>" target="Pipeline Status">view status page</a>
+<%
 }
 %>
 
 
 </BODY>
-</HTML>
\ No newline at end of file
+</HTML>
diff --git a/java/example/war/lettercount.jsp b/java/example/war/lettercount.jsp
index 2e5a8ec..ff3ab2b 100644
--- a/java/example/war/lettercount.jsp
+++ b/java/example/war/lettercount.jsp
@@ -2,6 +2,7 @@
 <%@ page import="com.google.appengine.tools.pipeline.demo.*" %>
 <%@ page import="com.google.appengine.tools.pipeline.demo.LetterCountExample.LetterCounter" %>
 <%@ page import="java.util.SortedMap" %>
+<%@taglib uri="http://github.com/GoogleCloudPlatform/appengine-pipelines/functions" prefix="f" %>
 
 <%!
     private static final String TEXT_PARAM_NAME = "text";
@@ -141,12 +142,11 @@
     if (null != pipelineId) {
 %>
 <p>
-    <a href="/_ah/pipeline/status.html?root=<%=pipelineId%>" target="Pipeline Status">view status
-        page</a>
-        <%
+  <a href="${f:baseUrl()}status.html?root=<%=pipelineId%>" target="Pipeline Status">view status page</a>
+<%
 }
 %>
 
 
 </BODY>
-</HTML>
\ No newline at end of file
+</HTML>
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
index 77f074a..0c02328 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
@@ -20,7 +20,6 @@
 import com.google.appengine.tools.pipeline.impl.PromisedValueImpl;
 import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec;
 import com.google.appengine.tools.pipeline.impl.model.JobRecord;
-import com.google.appengine.tools.pipeline.impl.model.Slot;
 
 import java.io.Serializable;
 import java.util.List;
@@ -84,9 +83,9 @@
  * A Job can provide an optional {@code handleException} method that is called
  * when any unhandled exception is thrown from its run method.
  * <p>
- * Before delivering an exception to the job’s handleException method the
+ * Before delivering an exception to the job's handleException method the
  * Pipelines framework cancels all descendants jobs that originated from the
- * parent’s run method. A descendant job is defined as a job that is either a
+ * parent's run method. A descendant job is defined as a job that is either a
  * child or the child of a child (and so on recursively) of the original job.
  * This cancellation is important for a number of reasons.
  * <ul>
@@ -109,7 +108,7 @@
  * A failure of a job that is a descendant of the handleException is handled in
  * the same manner as a failure of a job originated in the run method. All
  * failed job siblings originated in the handleException are cancelled and then
- * exception is propagated to the enclosing scope which is either ancestor’s run
+ * exception is propagated to the enclosing scope which is either ancestor's run
  * or handleException.
  * <p>
  * {@code handleException} methods must have a single argument of type
@@ -380,49 +379,12 @@
     return promisedValue;
   }
 
-  /**
-   * Invoke this method from within the {@code run} method of a <b>generator
-   * job</b> in order to declare that some value will be provided asynchronously
-   * by some external agent.
-   *
-   * @param <F> The type of the asynchronously provided value.
-   * @return A {@code PromisedValue} that represents an empty value slot that
-   *         will be filled at a later time when the external agent invokes
-   *         {@link PipelineService#submitPromisedValue(String, Object)}. This
-   *         may be passed in to further invocations of {@code futureCall()} in
-   *         order to specify a data dependency.
-   */
   public <F> PromisedValue<F> newPromise() {
     PromisedValueImpl<F> promisedValue =
         new PromisedValueImpl<>(getPipelineKey(), thisJobRecord.getKey(), currentRunGUID);
     updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot());
     return promisedValue;
   }
-  
-  /**
-   * Invoke this method from within the {@code run} method of a <b>generator
-   * job</b> in order to get a promised value that was created by an ancestor
-   * job that will be provided asynchronously by some external agent. This can 
-   * be used to share the same value with child {@link Job}s
-   * 
-   * @param promiseHandle The unique identifier for the {@link PromisedValue}
-   *        obtained during the execution of some job via the method
-   *        {@link PromisedValue#getHandle()}.
-   * @return A {@code PromisedValue} that represents an empty value slot that
-   *         will be filled at a later time when the external agent invokes
-   *         {@link PipelineService#submitPromisedValue(String, Object)}. This
-   *         may be passed in to further invocations of {@code futureCall()} in
-   *         order to specify a data dependency. This method will return 
-   *         <code>null</code> if the slot for the handle could not be found. 
-   */
-  public <F> PromisedValue<F> promise(String promiseHandle) {
-    Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle);
-    PromisedValueImpl<F> promisedValue = null;
-    if (slot != null) {
-      promisedValue = new PromisedValueImpl<>(slot);
-    }
-    return promisedValue;
-  }
 
   /**
    * Invoke this method from within the {@code run} method of a <b>generator
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
index 758aed4..4e80f57 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
@@ -24,7 +24,6 @@
 import com.google.appengine.tools.pipeline.JobSetting;
 import com.google.appengine.tools.pipeline.NoSuchObjectException;
 import com.google.appengine.tools.pipeline.OrphanedObjectException;
-import com.google.appengine.tools.pipeline.PromisedValue;
 import com.google.appengine.tools.pipeline.Value;
 import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd;
 import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd;
@@ -414,74 +413,11 @@
     backEnd.deletePipeline(key, force, async);
   }
 
-  /**
-   * Fills a the {@link PromisedValue}.
-   * @param promiseHandle the key to the promised value slot.
-   * @param value the value to fill.
-   * @throws NoSuchObjectException If there is no Job with the given key.
-   * @throws OrphanedObjectException If the slot has been orphaned.
-   */
   public static void acceptPromisedValue(String promiseHandle, Object value)
       throws NoSuchObjectException, OrphanedObjectException {
-    Slot slot = queryPromisedValueSlot(promiseHandle);
-    JobRecord generatorJob = getGeneratorJob(slot);
-    UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
-    registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
-    backEnd.save(updateSpec, generatorJob.getQueueSettings());
-  }
-  
-  private static JobRecord getGeneratorJob(Slot slot)
-      throws OrphanedObjectException, NoSuchObjectException {
-    Key generatorJobKey = slot.getGeneratorJobKey();
-    if (null == generatorJobKey) {
-      throw new RuntimeException(
-          "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
-    }
-    JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
-    if (null == generatorJob) {
-      throw new RuntimeException("Pipeline is fatally corrupted. "
-          + "The generator job for a promised value slot was not found: " + generatorJobKey);
-    }
-    String childGraphGuid = generatorJob.getChildGraphGuid();
-    if (null == childGraphGuid) {
-      // The generator job has not been saved with a childGraphGuid yet. This can happen if the
-      // promise handle leaked out to an external thread before the job that generated it
-      // had finished.
-      throw new NoSuchObjectException(
-          "The framework is not ready to accept the promised value yet. "
-          + "Please try again after the job that generated the promis handle has completed.");
-    }
-    if (!childGraphGuid.equals(slot.getGraphGuid())) {
-      // The slot has been orphaned
-      throw new OrphanedObjectException(KeyFactory.keyToString(slot.getKey()));
-    }
-    
-    return generatorJob;
-  }
-
-  /**
-   * Get the {@link Slot} for a promise handle that can be used to construct a
-   * {@link PromisedValue} for use in child {@link Job}s. It will attempt to lookup
-   * the key 5 times with an exponential back-off just in case it has been requested
-   * before the promise has been registered. 
-   * @param promiseHandle Key to find the {@link Slot}
-   * @return A {@link Slot} if the handle can be resolved otherwise <code>null</code>.
-   */
-  public static Slot getPromisedValueSlot(String promiseHandle) {
-    Slot slot = null;
-    try {
-      slot = queryPromisedValueSlot(promiseHandle);
-    } catch (Exception e) {
-      logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e);
-    }
-    return slot;
-  }
-  
-  private static Slot queryPromisedValueSlot(String promiseHandle)
-      throws NoSuchObjectException {
-    Slot slot = null;
     checkNonEmpty(promiseHandle, "promiseHandle");
     Key key = KeyFactory.stringToKey(promiseHandle);
+    Slot slot = null;
     // It is possible, though unlikely, that we might be asked to accept a
     // promise before the slot to hold the promise has been saved. We will try 5
     // times, sleeping 1, 2, 4, 8 seconds between attempts.
@@ -509,9 +445,34 @@
         Thread.currentThread().interrupt();
       }
     }
-    return slot;
+    Key generatorJobKey = slot.getGeneratorJobKey();
+    if (null == generatorJobKey) {
+      throw new RuntimeException(
+          "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
+    }
+    JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
+    if (null == generatorJob) {
+      throw new RuntimeException("Pipeline is fatally corrupted. "
+          + "The generator job for a promised value slot was not found: " + generatorJobKey);
+    }
+    String childGraphGuid = generatorJob.getChildGraphGuid();
+    if (null == childGraphGuid) {
+      // The generator job has not been saved with a childGraphGuid yet. This can happen if the
+      // promise handle leaked out to an external thread before the job that generated it
+      // had finished.
+      throw new NoSuchObjectException(
+          "The framework is not ready to accept the promised value yet. "
+          + "Please try again after the job that generated the promis handle has completed.");
+    }
+    if (!childGraphGuid.equals(slot.getGraphGuid())) {
+      // The slot has been orphaned
+      throw new OrphanedObjectException(promiseHandle);
+    }
+    UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
+    registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
+    backEnd.save(updateSpec, generatorJob.getQueueSettings());
   }
-  
+
   /**
    * The root job instance used to wrap a user provided root if the user
    * provided root job has exceptionHandler specified.
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
index e465d7b..06977f2 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
@@ -29,12 +29,8 @@
  */
 public class PromisedValueImpl<E> extends FutureValueImpl<E> implements PromisedValue<E> {
 
-  public PromisedValueImpl(Slot slot) {
-    super(slot);
-  }
-  
   public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) {
-    this(new Slot(rootJobGuid, generatorJobKey, graphGUID));
+    super(new Slot(rootJobGuid, generatorJobKey, graphGUID));
   }
 
   @Override
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
index c318e78..d72f660 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
@@ -136,7 +136,8 @@
 
   private TaskOptions toTaskOptions(Task task) {
     final QueueSettings queueSettings = task.getQueueSettings();
-    TaskOptions taskOptions = TaskOptions.Builder.withUrl(TaskHandler.HANDLE_TASK_URL);
+
+    TaskOptions taskOptions = TaskOptions.Builder.withUrl(TaskHandler.handleTaskUrl());
     if (queueSettings.getOnBackend() != null) {
       taskOptions.header("Host", BackendServiceFactory.getBackendService().getBackendAddress(
           queueSettings.getOnBackend()));
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
index 1469827..601a1ad 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
@@ -35,12 +35,23 @@
 @SuppressWarnings("serial")
 public class PipelineServlet extends HttpServlet {
 
-  // This must match the URL in web.xml
-  public static final String BASE_URL = "/_ah/pipeline/";
+  public static final String BASE_URL_PROPERTY = "com.google.appengine.tools.pipeline.BASE_URL";
+  public static final String BASE_URL = baseUrl();
+
+  /**
+   * Returns the Pipeline's BASE URL.
+   * This must match the URL in web.xml
+   */
+  public static String baseUrl() {
+    String baseURL =  System.getProperty(BASE_URL_PROPERTY, "/_ah/pipeline/");
+    if (!baseURL.endsWith("/")) {
+      baseURL += "/";
+    }
+    return baseURL;
+  }
 
   public static String makeViewerUrl(Key rootJobKey, Key jobKey) {
-    // TODO(user): BASE_URL could be replaced with ServletContext#getContextPath
-    return BASE_URL + "status.html?root=" + rootJobKey.getName() + "#pipeline-" + jobKey.getName();
+    return baseUrl() + "status.html?root=" + rootJobKey.getName() + "#pipeline-" + jobKey.getName();
   }
 
   private static enum RequestType {
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
index 1e6e8be..dc99d9a 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
@@ -37,12 +37,15 @@
   private static Logger logger = Logger.getLogger(TaskHandler.class.getName());
 
   public static final String PATH_COMPONENT = "handleTask";
-  public static final String HANDLE_TASK_URL = PipelineServlet.BASE_URL + PATH_COMPONENT;
-
   public static final String TASK_NAME_REQUEST_HEADER = "X-AppEngine-TaskName";
   public static final String TASK_RETRY_COUNT_HEADER = "X-AppEngine-TaskRetryCount";
   public static final String TASK_QUEUE_NAME_HEADER = "X-AppEngine-QueueName";
 
+
+  public static String handleTaskUrl() {
+    return PipelineServlet.baseUrl() + PATH_COMPONENT;
+  }
+
   public static void doPost(HttpServletRequest req) throws ServletException {
     Task task = reconstructTask(req);
     int retryCount;
diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
index 4ce9ec3..2e6a19d 100644
--- a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
+++ b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
@@ -16,7 +16,6 @@
 
 import com.google.appengine.tools.pipeline.JobInfo.State;
 import com.google.appengine.tools.pipeline.JobSetting.StatusConsoleUrl;
-import com.google.appengine.tools.pipeline.JobSetting.WaitForSetting;
 import com.google.appengine.tools.pipeline.impl.PipelineManager;
 import com.google.appengine.tools.pipeline.impl.model.JobRecord;
 import com.google.appengine.tools.pipeline.impl.model.PipelineObjects;
@@ -645,88 +644,4 @@
       return immediate(bytes.length);
     }
   }
-  
-  public void testUnfilledPromiseFromHandle() throws Exception {
-    PipelineService service = PipelineServiceFactory.newPipelineService();
-    String pipelineId = service.startNewPipeline(new UnfilledPromiseFromHandleParentJob());
-    String value = waitForJobToComplete(pipelineId);
-    assertEquals("1323", value);
-  }
-
-  @SuppressWarnings("serial")
-  private static class UnfilledPromiseFromHandleParentJob extends Job0<String> {
-
-    @Override
-    public Value<String> run() throws Exception {
-      PromisedValue<String> promise = newPromise();
-      FutureValue<String> child1 = futureCall(new PromiseDelegateJob(), immediate("1"),
-          immediate(promise.getHandle()));
-      FutureValue<String> child2 = futureCall(new PromiseDelegateJob(), immediate("2"),
-          immediate(promise.getHandle()));
-      futureCall(new FillPromiseJob(), immediate("3"), immediate(promise.getHandle()));
-      FutureValue<String> child4 = futureCall(new StringAdderJob(), child1, child2);
-      return child4;
-    }
-  }
-
-  @SuppressWarnings("serial")
-  private static class PromiseDelegateJob extends Job2<String, String, String> {
-    @Override
-    public Value<String> run(String value, String handle) {
-      PromisedValue<String> promise = promise(handle);
-      FutureValue<String> added = futureCall(new StringAdderJob(), immediate(value), promise);
-      return added;
-    }
-  }
-
-  @SuppressWarnings("serial")
-  private static class StringAdderJob extends Job2<String, String, String> {
-    @Override
-    public Value<String> run(String value1, String value2) {
-      return immediate(value1 + value2);
-    }
-  }
-
-  public void testFilledPromiseFromHandle() throws Exception {
-    PipelineService service = PipelineServiceFactory.newPipelineService();
-    String pipelineId = service.startNewPipeline(new FilledPromiseFromHandleParentJob());
-    String value = waitForJobToComplete(pipelineId);
-    assertEquals("1323", value);
-  }
-
-  @SuppressWarnings("serial")
-  private static class FilledPromiseFromHandleParentJob extends Job0<String> {
-
-    @Override
-    public Value<String> run() throws Exception {
-      PromisedValue<String> promise = newPromise();
-      WaitForSetting wait = waitFor(futureCall(new FillPromiseJob(), immediate("3"),
-          immediate(promise.getHandle())));
-
-      FutureValue<String> child1 = futureCall(new PromiseDelegateJob(), immediate("1"),
-          immediate(promise.getHandle()), wait);
-      FutureValue<String> child2 = futureCall(new PromiseDelegateJob(), immediate("2"),
-          immediate(promise.getHandle()), wait);
-
-      return futureCall(new StringAdderJob(), child1, child2);
-    }
-  }
-
-  public void testPromiseFromNonExistentHandle() throws Exception {
-    PipelineService service = PipelineServiceFactory.newPipelineService();
-    String pipelineId = service.startNewPipeline(new PromiseFromNonExistentHandleParentJob());
-    Boolean value = waitForJobToComplete(pipelineId);
-    assertTrue(value);
-  }
-
-  @SuppressWarnings("serial")
-  private static class PromiseFromNonExistentHandleParentJob extends Job0<Boolean> {
-
-    @Override
-    public Value<Boolean> run() throws Exception {
-      PromisedValue<String> promise = promise("SOME_NON_HANDLE");
-      return immediate(promise == null);
-    }
-  }
-  
 }
diff --git a/python/build.sh b/python/build.sh
new file mode 100755
index 0000000..29e802b
--- /dev/null
+++ b/python/build.sh
@@ -0,0 +1,83 @@
+#!/bin/bash
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dir=`dirname $0`
+demo_dir=`pwd`
+
+test () {
+  if [ -z $APPENGINE_LIB ]; then
+    echo "APPENGINE_LIB environment variable shoud be defined and should point to appengine sdk folder"
+    exit 1
+  fi
+
+  export PYTHONPATH="${PYTHONPATH}:\
+$APPENGINE_LIB:\
+$APPENGINE_LIB/lib/fancy_urllib:\
+$APPENGINE_LIB/lib/webob-1.1.1:\
+$APPENGINE_LIB/lib/yaml/lib:\
+$dir/src:\
+$dir/test:\
+"
+  fetch_dependencies
+  echo "Using PYTHONPATH=$PYTHONPATH"
+  exit_status=0
+  for t in $(find "$dir/test" -name "*test.py"); do
+    if python $t
+    then
+      echo "PASSED"
+    else
+      echo "FAILED"
+      ((exit_status++))
+    fi
+  done
+
+  echo "----------------------------------------------------------------------"
+  if [ $exit_status -ne 0 ];
+  then
+    echo "FAILED $exit_status tests"
+  else
+    echo "PASSED all tests"
+  fi
+  exit $exit_status
+}
+
+fetch_dependencies() {
+  if [ ! `which pip` ]
+  then
+    echo "pip not found. pip is required to install dependencies."
+    exit 1;
+  fi
+  # Work arround https://github.com/pypa/pip/issues/1356
+  for dep in `cat $dir/src/todelete.txt`
+  do
+      rm -r $dir/src/$dep $dir/src/$dep*-info 2>/dev/null
+  done
+
+  pip install --exists-action=s -r $dir/src/requirements.txt -t $dir/src/ --upgrade || exit 1
+  pip install --exists-action=s -r $dir/src/requirements.txt -t $dir/demo/ --upgrade || exit 1
+}
+
+case "$1" in
+  test)
+    test
+    ;;
+  deps)
+    fetch_dependencies
+    ;;
+  *)
+    echo $"Usage: $0 {test|deps}"
+    exit 1
+esac
diff --git a/python/src/pipeline/pipeline.py b/python/src/pipeline/pipeline.py
index a40cd49..573623a 100755
--- a/python/src/pipeline/pipeline.py
+++ b/python/src/pipeline/pipeline.py
@@ -1201,6 +1201,11 @@
 
   def __enter__(self):
     """When entering a 'with' block."""
+    # Reentrancy checking gives false errors in test mode since everything is
+    # on the same thread, and all pipelines are executed in order in test mode
+    # anyway, so disable InOrder for tests.
+    if _TEST_MODE:
+        return
     InOrder._thread_init()
     if InOrder._local._activated:
       raise UnexpectedPipelineError('Already in an InOrder "with" block.')
@@ -1209,6 +1214,8 @@
 
   def __exit__(self, type, value, trace):
     """When exiting a 'with' block."""
+    if _TEST_MODE:
+        return
     InOrder._local._activated = False
     InOrder._local._in_order_futures.clear()
     return False
@@ -1233,20 +1240,26 @@
 
 def _write_json_blob(encoded_value, pipeline_id=None):
   """Writes a JSON encoded value to a Cloud Storage File.
-  
+
   This function will store the blob in a GCS file in the default bucket under
   the appengine_pipeline directory. Optionally using another directory level
   specified by pipeline_id
   Args:
     encoded_value: The encoded JSON string.
-    pipeline_id: A pipeline id to segment files in Cloud Storage, if none, 
+    pipeline_id: A pipeline id to segment files in Cloud Storage, if none,
       the file will be created under appengine_pipeline
 
   Returns:
     The blobstore.BlobKey for the file that was created.
   """
-  
+
   default_bucket = app_identity.get_default_gcs_bucket_name()
+  if default_bucket is None:
+    raise Exception(
+      "No default cloud storage bucket has been set for this application. "
+      "This app was likely created before v1.9.0, please see: "
+      "https://cloud.google.com/appengine/docs/php/googlestorage/setup")
+
   path_components = ['/', default_bucket, "appengine_pipeline"]
   if pipeline_id:
     path_components.append(pipeline_id)
@@ -1574,8 +1587,7 @@
         continue
       blocking_slot_dict[slot_record.key()] = slot_record
 
-    task_list = []
-    updated_barriers = []
+    barriers_to_trigger = []
     for barrier in results:
       ready_slots = []
       for blocking_slot_key in barrier.blocking_slots:
@@ -1594,32 +1606,44 @@
       # the task name tombstones.
       pending_slots = set(barrier.blocking_slots) - set(ready_slots)
       if not pending_slots:
-        if barrier.status != _BarrierRecord.FIRED:
-          barrier.status = _BarrierRecord.FIRED
-          barrier.trigger_time = self._gettime()
-          updated_barriers.append(barrier)
-
-        purpose = barrier.key().name()
-        if purpose == _BarrierRecord.START:
-          path = self.pipeline_handler_path
-          countdown = None
-        else:
-          path = self.finalized_handler_path
-          # NOTE: Wait one second before finalization to prevent
-          # contention on the _PipelineRecord entity.
-          countdown = 1
-        pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
-        logging.debug('Firing barrier %r', barrier.key())
-        task_list.append(taskqueue.Task(
-            url=path,
-            countdown=countdown,
-            name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
-            params=dict(pipeline_key=pipeline_key, purpose=purpose),
-            headers={'X-Ae-Pipeline-Key': pipeline_key}))
+        barriers_to_trigger.append(barrier)
       else:
         logging.debug('Not firing barrier %r, Waiting for slots: %r',
                       barrier.key(), pending_slots)
 
+    pipeline_keys_to_trigger = [
+        _BarrierRecord.target.get_value_for_datastore(barrier)
+        for barrier in barriers_to_trigger]
+    pipelines_to_trigger = dict(zip(
+        pipeline_keys_to_trigger, db.get(pipeline_keys_to_trigger)))
+    task_list = []
+    updated_barriers = []
+    for barrier in barriers_to_trigger:
+      if barrier.status != _BarrierRecord.FIRED:
+        barrier.status = _BarrierRecord.FIRED
+        barrier.trigger_time = self._gettime()
+        updated_barriers.append(barrier)
+
+      purpose = barrier.key().name()
+      if purpose == _BarrierRecord.START:
+        path = self.pipeline_handler_path
+        countdown = None
+      else:
+        path = self.finalized_handler_path
+        # NOTE: Wait one second before finalization to prevent
+        # contention on the _PipelineRecord entity.
+        countdown = 1
+      pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
+      target = pipelines_to_trigger[pipeline_key].params.get('target')
+      logging.debug('Firing barrier %r', barrier.key())
+      task_list.append(taskqueue.Task(
+          url=path,
+          countdown=countdown,
+          name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
+          params=dict(pipeline_key=pipeline_key, purpose=purpose),
+          headers={'X-Ae-Pipeline-Key': pipeline_key},
+          target=target))
+
     # Blindly overwrite _BarrierRecords that have an updated status. This is
     # acceptable because by this point all finalization barriers for
     # generator children should have already had their final outputs assigned.
@@ -1933,16 +1957,6 @@
       else:
         # Generator yielded no children, so treat it as a sync function.
         stage.outputs.default._set_value_test(stage._pipeline_key, None)
-
-      # Enforce the policy of requiring all undeclared output slots from
-      # child pipelines to be consumed by their parent generator.
-      for slot in all_output_slots:
-        if slot.name == 'default':
-          continue
-        if slot.filled and not slot._strict and not slot._touched:
-          raise SlotNotDeclaredError(
-              'Undeclared output "%s"; all dynamic outputs from child '
-              'pipelines must be consumed.' % slot.name)
     else:
       try:
         result = stage.run_test(*stage.args, **stage.kwargs)
@@ -2594,7 +2608,8 @@
             params=dict(pipeline_key=pipeline_key,
                         purpose=_BarrierRecord.START,
                         attempt=pipeline_record.current_attempt),
-            headers={'X-Ae-Pipeline-Key': pipeline_key})
+            headers={'X-Ae-Pipeline-Key': pipeline_key},
+            target=pipeline_record.params.get('target'))
         task.add(queue_name=self.queue_name, transactional=True)
 
       pipeline_record.put()
@@ -2712,7 +2727,7 @@
       all_tasks.append(taskqueue.Task(
           url=context.pipeline_handler_path,
           params=dict(pipeline_key=pipeline_key),
-          target=child_pipeline.params['target'],
+          target=child_pipeline.params.get('target'),
           headers={'X-Ae-Pipeline-Key': pipeline_key},
           name='ae-pipeline-fan-out-' + child_pipeline.key().name()))
 
@@ -2901,6 +2916,7 @@
       outputs: Dictionary of output slot dictionaries.
       children: List of child pipeline IDs.
       queueName: Queue on which this pipeline is running.
+      target: Target version/module for the pipeline.
       afterSlotKeys: List of Slot Ids after which this pipeline runs.
       currentAttempt: Number of the current attempt, starting at 1.
       maxAttempts: Maximum number of attempts before aborting.
@@ -2969,6 +2985,7 @@
     'outputs': params['output_slots'].copy(),
     'children': [key.name() for key in pipeline_record.fanned_out],
     'queueName': params['queue_name'],
+    'target': params['target'],
     'afterSlotKeys': [str(key) for key in params['after_all']],
     'currentAttempt': pipeline_record.current_attempt + 1,
     'maxAttempts': pipeline_record.max_attempts,
diff --git a/python/src/pipeline/ui/status.css b/python/src/pipeline/ui/status.css
index f4d5105..1b581b6 100644
--- a/python/src/pipeline/ui/status.css
+++ b/python/src/pipeline/ui/status.css
@@ -83,6 +83,7 @@
 
 /* detail-specific styling */
 .status-param,
+.target-param,
 .retry-param {
   padding-left: 1em;
   font-size: 0.85em;
@@ -110,6 +111,7 @@
 #detail .param-container,
 #detail .child-container,
 #detail .run-after-container,
+#detail .status-target-params,
 #detail .status-retry-params {
   margin-top: 1em;
 }
diff --git a/python/src/pipeline/ui/status.js b/python/src/pipeline/ui/status.js
index eca6da5..bd7fd72 100644
--- a/python/src/pipeline/ui/status.js
+++ b/python/src/pipeline/ui/status.js
@@ -326,6 +326,29 @@
     containerDiv.append(linksDiv);
   }
 
+  // Target parameters.
+  if (!sidebar) {
+    var targetParamsDiv = $('<div class="status-target-params">');
+    targetParamsDiv.append(
+        $('<div class="target-params-title">').text('Target parameters'));
+
+    var queueNameDiv = $('<div class="target-param">');
+    $('<span>').text('Queue name: ').appendTo(queueNameDiv);
+    $('<span>')
+        .text(infoMap.queueName)
+        .appendTo(queueNameDiv);
+    targetParamsDiv.append(queueNameDiv);
+
+    var targetDiv = $('<div class="target-param">');
+    $('<span>').text('Target: ').appendTo(targetDiv);
+    $('<span>')
+        .text(infoMap.target || 'Unspecified')
+        .appendTo(targetDiv);
+    targetParamsDiv.append(targetDiv);
+
+    containerDiv.append(targetParamsDiv);
+  }
+
   // Retry parameters.
   if (!sidebar) {
     var retryParamsDiv = $('<div class="status-retry-params">');
diff --git a/python/src/pipeline/util.py b/python/src/pipeline/util.py
index 62eba10..96f28e8 100755
--- a/python/src/pipeline/util.py
+++ b/python/src/pipeline/util.py
@@ -32,6 +32,8 @@
 except ImportError:
   import simplejson as json
 
+from google.appengine.ext import ndb
+
 # pylint: disable=protected-access
 
 
@@ -229,3 +231,17 @@
 _register_json_primitive(datetime.datetime,
                          _json_encode_datetime,
                          _json_decode_datetime)
+
+# ndb.Key
+def _JsonEncodeKey(o):
+    """Json encode an ndb.Key object."""
+    return {'key_string': o.urlsafe()}
+
+def _JsonDecodeKey(d):
+    """Json decode a ndb.Key object."""
+    k_c = d['key_string']
+    if isinstance(k_c, (list, tuple)):
+        return ndb.Key(flat=k_c)
+    return ndb.Key(urlsafe=d['key_string'])
+
+_register_json_primitive(ndb.Key, _JsonEncodeKey, _JsonDecodeKey)
diff --git a/python/src/todelete.txt b/python/src/todelete.txt
new file mode 100644
index 0000000..4e94b58
--- /dev/null
+++ b/python/src/todelete.txt
@@ -0,0 +1,2 @@
+GoogleAppEngineCloudStorageClient
+cloudstorage
diff --git a/python/test/common_test.py b/python/test/common_test.py
index db86eb9..8be6ae5 100755
--- a/python/test/common_test.py
+++ b/python/test/common_test.py
@@ -27,13 +27,12 @@
 from pipeline import common
 from pipeline import pipeline
 import test_shared
-from appengine_pipeline.test import testutil
+import testutil
 
 
-class CommonTest(test_shared.TaskRunningMixin, unittest.TestCase):
+class CommonTest(testutil.TestSetupMixin, test_shared.TaskRunningMixin, unittest.TestCase):
 
   def setUp(self):
-    testutil.setup_for_testing()
     super(CommonTest, self).setUp()
 
   def testReturn(self):