Merge pull request #38 from Khan/fix_zipimport
Add zipimport compatability to ResourceHandler
diff --git a/.gitignore b/.gitignore
index 5ca0973..9d6e5df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
.DS_Store
-
+.idea
+*.iml
diff --git a/java/example/war/WEB-INF/appengine-web.xml b/java/example/war/WEB-INF/appengine-web.xml
index facc604..18615d6 100644
--- a/java/example/war/WEB-INF/appengine-web.xml
+++ b/java/example/war/WEB-INF/appengine-web.xml
@@ -6,6 +6,7 @@
<!-- Configure java.util.logging -->
<system-properties>
+ <property name="com.google.appengine.tools.pipeline.BASE_URL" value="/pipeline/"/>
<property name="java.util.logging.config.file" value="WEB-INF/logging.properties"/>
</system-properties>
diff --git a/java/example/war/WEB-INF/functions.tld b/java/example/war/WEB-INF/functions.tld
new file mode 100644
index 0000000..e6876ea
--- /dev/null
+++ b/java/example/war/WEB-INF/functions.tld
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<taglib
+ xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-jsptaglibrary_2_1.xsd"
+ version="2.1">
+
+ <display-name>Custom Functions</display-name>
+ <tlib-version>1.0</tlib-version>
+ <uri>http://github.com/GoogleCloudPlatform/appengine-pipelines/functions</uri>
+
+ <function>
+ <name>baseUrl</name>
+ <function-class>com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet</function-class>
+ <function-signature>java.lang.String baseUrl()</function-signature>
+ </function>
+</taglib>
diff --git a/java/example/war/WEB-INF/web.xml b/java/example/war/WEB-INF/web.xml
index 3ca30e2..1b16b0e 100644
--- a/java/example/war/WEB-INF/web.xml
+++ b/java/example/war/WEB-INF/web.xml
@@ -10,7 +10,7 @@
</servlet>
<servlet-mapping>
<servlet-name>PipelineServlet</servlet-name>
- <url-pattern>/_ah/pipeline/*</url-pattern>
+ <url-pattern>/pipeline/*</url-pattern>
</servlet-mapping>
<welcome-file-list>
<welcome-file>index.html</welcome-file>
diff --git a/java/example/war/gcd.jsp b/java/example/war/gcd.jsp
index e564f42..bf06cff 100644
--- a/java/example/war/gcd.jsp
+++ b/java/example/war/gcd.jsp
@@ -1,6 +1,7 @@
<%@ page import="com.google.appengine.tools.pipeline.*" %>
<%@ page import="com.google.appengine.tools.pipeline.demo.*" %>
<%@ page import="com.google.appengine.tools.pipeline.demo.GCDExample.GCDJob" %>
+<%@taglib uri="http://github.com/GoogleCloudPlatform/appengine-pipelines/functions" prefix="f" %>
<%!
private static final String X_PARAM_NAME = "x";
@@ -133,12 +134,11 @@
if (null != pipelineId) {
%>
<p>
- <a href="/_ah/pipeline/status.html?root=<%=pipelineId%>" target="Pipeline Status">view status
- page</a>
- <%
+ <a href="${f:baseUrl()}status.html?root=<%=pipelineId%>" target="Pipeline Status">view status page</a>
+<%
}
%>
</BODY>
-</HTML>
\ No newline at end of file
+</HTML>
diff --git a/java/example/war/lettercount.jsp b/java/example/war/lettercount.jsp
index 2e5a8ec..ff3ab2b 100644
--- a/java/example/war/lettercount.jsp
+++ b/java/example/war/lettercount.jsp
@@ -2,6 +2,7 @@
<%@ page import="com.google.appengine.tools.pipeline.demo.*" %>
<%@ page import="com.google.appengine.tools.pipeline.demo.LetterCountExample.LetterCounter" %>
<%@ page import="java.util.SortedMap" %>
+<%@taglib uri="http://github.com/GoogleCloudPlatform/appengine-pipelines/functions" prefix="f" %>
<%!
private static final String TEXT_PARAM_NAME = "text";
@@ -141,12 +142,11 @@
if (null != pipelineId) {
%>
<p>
- <a href="/_ah/pipeline/status.html?root=<%=pipelineId%>" target="Pipeline Status">view status
- page</a>
- <%
+ <a href="${f:baseUrl()}status.html?root=<%=pipelineId%>" target="Pipeline Status">view status page</a>
+<%
}
%>
</BODY>
-</HTML>
\ No newline at end of file
+</HTML>
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
index 77f074a..0c02328 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
@@ -20,7 +20,6 @@
import com.google.appengine.tools.pipeline.impl.PromisedValueImpl;
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
-import com.google.appengine.tools.pipeline.impl.model.Slot;
import java.io.Serializable;
import java.util.List;
@@ -84,9 +83,9 @@
* A Job can provide an optional {@code handleException} method that is called
* when any unhandled exception is thrown from its run method.
* <p>
- * Before delivering an exception to the job’s handleException method the
+ * Before delivering an exception to the job's handleException method the
* Pipelines framework cancels all descendants jobs that originated from the
- * parent’s run method. A descendant job is defined as a job that is either a
+ * parent's run method. A descendant job is defined as a job that is either a
* child or the child of a child (and so on recursively) of the original job.
* This cancellation is important for a number of reasons.
* <ul>
@@ -109,7 +108,7 @@
* A failure of a job that is a descendant of the handleException is handled in
* the same manner as a failure of a job originated in the run method. All
* failed job siblings originated in the handleException are cancelled and then
- * exception is propagated to the enclosing scope which is either ancestor’s run
+ * exception is propagated to the enclosing scope which is either ancestor's run
* or handleException.
* <p>
* {@code handleException} methods must have a single argument of type
@@ -380,49 +379,12 @@
return promisedValue;
}
- /**
- * Invoke this method from within the {@code run} method of a <b>generator
- * job</b> in order to declare that some value will be provided asynchronously
- * by some external agent.
- *
- * @param <F> The type of the asynchronously provided value.
- * @return A {@code PromisedValue} that represents an empty value slot that
- * will be filled at a later time when the external agent invokes
- * {@link PipelineService#submitPromisedValue(String, Object)}. This
- * may be passed in to further invocations of {@code futureCall()} in
- * order to specify a data dependency.
- */
public <F> PromisedValue<F> newPromise() {
PromisedValueImpl<F> promisedValue =
new PromisedValueImpl<>(getPipelineKey(), thisJobRecord.getKey(), currentRunGUID);
updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot());
return promisedValue;
}
-
- /**
- * Invoke this method from within the {@code run} method of a <b>generator
- * job</b> in order to get a promised value that was created by an ancestor
- * job that will be provided asynchronously by some external agent. This can
- * be used to share the same value with child {@link Job}s
- *
- * @param promiseHandle The unique identifier for the {@link PromisedValue}
- * obtained during the execution of some job via the method
- * {@link PromisedValue#getHandle()}.
- * @return A {@code PromisedValue} that represents an empty value slot that
- * will be filled at a later time when the external agent invokes
- * {@link PipelineService#submitPromisedValue(String, Object)}. This
- * may be passed in to further invocations of {@code futureCall()} in
- * order to specify a data dependency. This method will return
- * <code>null</code> if the slot for the handle could not be found.
- */
- public <F> PromisedValue<F> promise(String promiseHandle) {
- Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle);
- PromisedValueImpl<F> promisedValue = null;
- if (slot != null) {
- promisedValue = new PromisedValueImpl<>(slot);
- }
- return promisedValue;
- }
/**
* Invoke this method from within the {@code run} method of a <b>generator
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
index 758aed4..4e80f57 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
@@ -24,7 +24,6 @@
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.OrphanedObjectException;
-import com.google.appengine.tools.pipeline.PromisedValue;
import com.google.appengine.tools.pipeline.Value;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd;
import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd;
@@ -414,74 +413,11 @@
backEnd.deletePipeline(key, force, async);
}
- /**
- * Fills a the {@link PromisedValue}.
- * @param promiseHandle the key to the promised value slot.
- * @param value the value to fill.
- * @throws NoSuchObjectException If there is no Job with the given key.
- * @throws OrphanedObjectException If the slot has been orphaned.
- */
public static void acceptPromisedValue(String promiseHandle, Object value)
throws NoSuchObjectException, OrphanedObjectException {
- Slot slot = queryPromisedValueSlot(promiseHandle);
- JobRecord generatorJob = getGeneratorJob(slot);
- UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
- registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
- backEnd.save(updateSpec, generatorJob.getQueueSettings());
- }
-
- private static JobRecord getGeneratorJob(Slot slot)
- throws OrphanedObjectException, NoSuchObjectException {
- Key generatorJobKey = slot.getGeneratorJobKey();
- if (null == generatorJobKey) {
- throw new RuntimeException(
- "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
- }
- JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
- if (null == generatorJob) {
- throw new RuntimeException("Pipeline is fatally corrupted. "
- + "The generator job for a promised value slot was not found: " + generatorJobKey);
- }
- String childGraphGuid = generatorJob.getChildGraphGuid();
- if (null == childGraphGuid) {
- // The generator job has not been saved with a childGraphGuid yet. This can happen if the
- // promise handle leaked out to an external thread before the job that generated it
- // had finished.
- throw new NoSuchObjectException(
- "The framework is not ready to accept the promised value yet. "
- + "Please try again after the job that generated the promis handle has completed.");
- }
- if (!childGraphGuid.equals(slot.getGraphGuid())) {
- // The slot has been orphaned
- throw new OrphanedObjectException(KeyFactory.keyToString(slot.getKey()));
- }
-
- return generatorJob;
- }
-
- /**
- * Get the {@link Slot} for a promise handle that can be used to construct a
- * {@link PromisedValue} for use in child {@link Job}s. It will attempt to lookup
- * the key 5 times with an exponential back-off just in case it has been requested
- * before the promise has been registered.
- * @param promiseHandle Key to find the {@link Slot}
- * @return A {@link Slot} if the handle can be resolved otherwise <code>null</code>.
- */
- public static Slot getPromisedValueSlot(String promiseHandle) {
- Slot slot = null;
- try {
- slot = queryPromisedValueSlot(promiseHandle);
- } catch (Exception e) {
- logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e);
- }
- return slot;
- }
-
- private static Slot queryPromisedValueSlot(String promiseHandle)
- throws NoSuchObjectException {
- Slot slot = null;
checkNonEmpty(promiseHandle, "promiseHandle");
Key key = KeyFactory.stringToKey(promiseHandle);
+ Slot slot = null;
// It is possible, though unlikely, that we might be asked to accept a
// promise before the slot to hold the promise has been saved. We will try 5
// times, sleeping 1, 2, 4, 8 seconds between attempts.
@@ -509,9 +445,34 @@
Thread.currentThread().interrupt();
}
}
- return slot;
+ Key generatorJobKey = slot.getGeneratorJobKey();
+ if (null == generatorJobKey) {
+ throw new RuntimeException(
+ "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
+ }
+ JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
+ if (null == generatorJob) {
+ throw new RuntimeException("Pipeline is fatally corrupted. "
+ + "The generator job for a promised value slot was not found: " + generatorJobKey);
+ }
+ String childGraphGuid = generatorJob.getChildGraphGuid();
+ if (null == childGraphGuid) {
+ // The generator job has not been saved with a childGraphGuid yet. This can happen if the
+ // promise handle leaked out to an external thread before the job that generated it
+ // had finished.
+ throw new NoSuchObjectException(
+ "The framework is not ready to accept the promised value yet. "
+ + "Please try again after the job that generated the promis handle has completed.");
+ }
+ if (!childGraphGuid.equals(slot.getGraphGuid())) {
+ // The slot has been orphaned
+ throw new OrphanedObjectException(promiseHandle);
+ }
+ UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
+ registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
+ backEnd.save(updateSpec, generatorJob.getQueueSettings());
}
-
+
/**
* The root job instance used to wrap a user provided root if the user
* provided root job has exceptionHandler specified.
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
index e465d7b..06977f2 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
@@ -29,12 +29,8 @@
*/
public class PromisedValueImpl<E> extends FutureValueImpl<E> implements PromisedValue<E> {
- public PromisedValueImpl(Slot slot) {
- super(slot);
- }
-
public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) {
- this(new Slot(rootJobGuid, generatorJobKey, graphGUID));
+ super(new Slot(rootJobGuid, generatorJobKey, graphGUID));
}
@Override
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
index c318e78..d72f660 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineTaskQueue.java
@@ -136,7 +136,8 @@
private TaskOptions toTaskOptions(Task task) {
final QueueSettings queueSettings = task.getQueueSettings();
- TaskOptions taskOptions = TaskOptions.Builder.withUrl(TaskHandler.HANDLE_TASK_URL);
+
+ TaskOptions taskOptions = TaskOptions.Builder.withUrl(TaskHandler.handleTaskUrl());
if (queueSettings.getOnBackend() != null) {
taskOptions.header("Host", BackendServiceFactory.getBackendService().getBackendAddress(
queueSettings.getOnBackend()));
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
index 1469827..601a1ad 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
@@ -35,12 +35,23 @@
@SuppressWarnings("serial")
public class PipelineServlet extends HttpServlet {
- // This must match the URL in web.xml
- public static final String BASE_URL = "/_ah/pipeline/";
+ public static final String BASE_URL_PROPERTY = "com.google.appengine.tools.pipeline.BASE_URL";
+ public static final String BASE_URL = baseUrl();
+
+ /**
+ * Returns the Pipeline's BASE URL.
+ * This must match the URL in web.xml
+ */
+ public static String baseUrl() {
+ String baseURL = System.getProperty(BASE_URL_PROPERTY, "/_ah/pipeline/");
+ if (!baseURL.endsWith("/")) {
+ baseURL += "/";
+ }
+ return baseURL;
+ }
public static String makeViewerUrl(Key rootJobKey, Key jobKey) {
- // TODO(user): BASE_URL could be replaced with ServletContext#getContextPath
- return BASE_URL + "status.html?root=" + rootJobKey.getName() + "#pipeline-" + jobKey.getName();
+ return baseUrl() + "status.html?root=" + rootJobKey.getName() + "#pipeline-" + jobKey.getName();
}
private static enum RequestType {
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
index 1e6e8be..dc99d9a 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/TaskHandler.java
@@ -37,12 +37,15 @@
private static Logger logger = Logger.getLogger(TaskHandler.class.getName());
public static final String PATH_COMPONENT = "handleTask";
- public static final String HANDLE_TASK_URL = PipelineServlet.BASE_URL + PATH_COMPONENT;
-
public static final String TASK_NAME_REQUEST_HEADER = "X-AppEngine-TaskName";
public static final String TASK_RETRY_COUNT_HEADER = "X-AppEngine-TaskRetryCount";
public static final String TASK_QUEUE_NAME_HEADER = "X-AppEngine-QueueName";
+
+ public static String handleTaskUrl() {
+ return PipelineServlet.baseUrl() + PATH_COMPONENT;
+ }
+
public static void doPost(HttpServletRequest req) throws ServletException {
Task task = reconstructTask(req);
int retryCount;
diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
index 4ce9ec3..2e6a19d 100644
--- a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
+++ b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
@@ -16,7 +16,6 @@
import com.google.appengine.tools.pipeline.JobInfo.State;
import com.google.appengine.tools.pipeline.JobSetting.StatusConsoleUrl;
-import com.google.appengine.tools.pipeline.JobSetting.WaitForSetting;
import com.google.appengine.tools.pipeline.impl.PipelineManager;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.impl.model.PipelineObjects;
@@ -645,88 +644,4 @@
return immediate(bytes.length);
}
}
-
- public void testUnfilledPromiseFromHandle() throws Exception {
- PipelineService service = PipelineServiceFactory.newPipelineService();
- String pipelineId = service.startNewPipeline(new UnfilledPromiseFromHandleParentJob());
- String value = waitForJobToComplete(pipelineId);
- assertEquals("1323", value);
- }
-
- @SuppressWarnings("serial")
- private static class UnfilledPromiseFromHandleParentJob extends Job0<String> {
-
- @Override
- public Value<String> run() throws Exception {
- PromisedValue<String> promise = newPromise();
- FutureValue<String> child1 = futureCall(new PromiseDelegateJob(), immediate("1"),
- immediate(promise.getHandle()));
- FutureValue<String> child2 = futureCall(new PromiseDelegateJob(), immediate("2"),
- immediate(promise.getHandle()));
- futureCall(new FillPromiseJob(), immediate("3"), immediate(promise.getHandle()));
- FutureValue<String> child4 = futureCall(new StringAdderJob(), child1, child2);
- return child4;
- }
- }
-
- @SuppressWarnings("serial")
- private static class PromiseDelegateJob extends Job2<String, String, String> {
- @Override
- public Value<String> run(String value, String handle) {
- PromisedValue<String> promise = promise(handle);
- FutureValue<String> added = futureCall(new StringAdderJob(), immediate(value), promise);
- return added;
- }
- }
-
- @SuppressWarnings("serial")
- private static class StringAdderJob extends Job2<String, String, String> {
- @Override
- public Value<String> run(String value1, String value2) {
- return immediate(value1 + value2);
- }
- }
-
- public void testFilledPromiseFromHandle() throws Exception {
- PipelineService service = PipelineServiceFactory.newPipelineService();
- String pipelineId = service.startNewPipeline(new FilledPromiseFromHandleParentJob());
- String value = waitForJobToComplete(pipelineId);
- assertEquals("1323", value);
- }
-
- @SuppressWarnings("serial")
- private static class FilledPromiseFromHandleParentJob extends Job0<String> {
-
- @Override
- public Value<String> run() throws Exception {
- PromisedValue<String> promise = newPromise();
- WaitForSetting wait = waitFor(futureCall(new FillPromiseJob(), immediate("3"),
- immediate(promise.getHandle())));
-
- FutureValue<String> child1 = futureCall(new PromiseDelegateJob(), immediate("1"),
- immediate(promise.getHandle()), wait);
- FutureValue<String> child2 = futureCall(new PromiseDelegateJob(), immediate("2"),
- immediate(promise.getHandle()), wait);
-
- return futureCall(new StringAdderJob(), child1, child2);
- }
- }
-
- public void testPromiseFromNonExistentHandle() throws Exception {
- PipelineService service = PipelineServiceFactory.newPipelineService();
- String pipelineId = service.startNewPipeline(new PromiseFromNonExistentHandleParentJob());
- Boolean value = waitForJobToComplete(pipelineId);
- assertTrue(value);
- }
-
- @SuppressWarnings("serial")
- private static class PromiseFromNonExistentHandleParentJob extends Job0<Boolean> {
-
- @Override
- public Value<Boolean> run() throws Exception {
- PromisedValue<String> promise = promise("SOME_NON_HANDLE");
- return immediate(promise == null);
- }
- }
-
}
diff --git a/python/build.sh b/python/build.sh
new file mode 100755
index 0000000..29e802b
--- /dev/null
+++ b/python/build.sh
@@ -0,0 +1,83 @@
+#!/bin/bash
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dir=`dirname $0`
+demo_dir=`pwd`
+
+test () {
+ if [ -z $APPENGINE_LIB ]; then
+ echo "APPENGINE_LIB environment variable shoud be defined and should point to appengine sdk folder"
+ exit 1
+ fi
+
+ export PYTHONPATH="${PYTHONPATH}:\
+$APPENGINE_LIB:\
+$APPENGINE_LIB/lib/fancy_urllib:\
+$APPENGINE_LIB/lib/webob-1.1.1:\
+$APPENGINE_LIB/lib/yaml/lib:\
+$dir/src:\
+$dir/test:\
+"
+ fetch_dependencies
+ echo "Using PYTHONPATH=$PYTHONPATH"
+ exit_status=0
+ for t in $(find "$dir/test" -name "*test.py"); do
+ if python $t
+ then
+ echo "PASSED"
+ else
+ echo "FAILED"
+ ((exit_status++))
+ fi
+ done
+
+ echo "----------------------------------------------------------------------"
+ if [ $exit_status -ne 0 ];
+ then
+ echo "FAILED $exit_status tests"
+ else
+ echo "PASSED all tests"
+ fi
+ exit $exit_status
+}
+
+fetch_dependencies() {
+ if [ ! `which pip` ]
+ then
+ echo "pip not found. pip is required to install dependencies."
+ exit 1;
+ fi
+ # Work arround https://github.com/pypa/pip/issues/1356
+ for dep in `cat $dir/src/todelete.txt`
+ do
+ rm -r $dir/src/$dep $dir/src/$dep*-info 2>/dev/null
+ done
+
+ pip install --exists-action=s -r $dir/src/requirements.txt -t $dir/src/ --upgrade || exit 1
+ pip install --exists-action=s -r $dir/src/requirements.txt -t $dir/demo/ --upgrade || exit 1
+}
+
+case "$1" in
+ test)
+ test
+ ;;
+ deps)
+ fetch_dependencies
+ ;;
+ *)
+ echo $"Usage: $0 {test|deps}"
+ exit 1
+esac
diff --git a/python/src/pipeline/pipeline.py b/python/src/pipeline/pipeline.py
index a40cd49..573623a 100755
--- a/python/src/pipeline/pipeline.py
+++ b/python/src/pipeline/pipeline.py
@@ -1201,6 +1201,11 @@
def __enter__(self):
"""When entering a 'with' block."""
+ # Reentrancy checking gives false errors in test mode since everything is
+ # on the same thread, and all pipelines are executed in order in test mode
+ # anyway, so disable InOrder for tests.
+ if _TEST_MODE:
+ return
InOrder._thread_init()
if InOrder._local._activated:
raise UnexpectedPipelineError('Already in an InOrder "with" block.')
@@ -1209,6 +1214,8 @@
def __exit__(self, type, value, trace):
"""When exiting a 'with' block."""
+ if _TEST_MODE:
+ return
InOrder._local._activated = False
InOrder._local._in_order_futures.clear()
return False
@@ -1233,20 +1240,26 @@
def _write_json_blob(encoded_value, pipeline_id=None):
"""Writes a JSON encoded value to a Cloud Storage File.
-
+
This function will store the blob in a GCS file in the default bucket under
the appengine_pipeline directory. Optionally using another directory level
specified by pipeline_id
Args:
encoded_value: The encoded JSON string.
- pipeline_id: A pipeline id to segment files in Cloud Storage, if none,
+ pipeline_id: A pipeline id to segment files in Cloud Storage, if none,
the file will be created under appengine_pipeline
Returns:
The blobstore.BlobKey for the file that was created.
"""
-
+
default_bucket = app_identity.get_default_gcs_bucket_name()
+ if default_bucket is None:
+ raise Exception(
+ "No default cloud storage bucket has been set for this application. "
+ "This app was likely created before v1.9.0, please see: "
+ "https://cloud.google.com/appengine/docs/php/googlestorage/setup")
+
path_components = ['/', default_bucket, "appengine_pipeline"]
if pipeline_id:
path_components.append(pipeline_id)
@@ -1574,8 +1587,7 @@
continue
blocking_slot_dict[slot_record.key()] = slot_record
- task_list = []
- updated_barriers = []
+ barriers_to_trigger = []
for barrier in results:
ready_slots = []
for blocking_slot_key in barrier.blocking_slots:
@@ -1594,32 +1606,44 @@
# the task name tombstones.
pending_slots = set(barrier.blocking_slots) - set(ready_slots)
if not pending_slots:
- if barrier.status != _BarrierRecord.FIRED:
- barrier.status = _BarrierRecord.FIRED
- barrier.trigger_time = self._gettime()
- updated_barriers.append(barrier)
-
- purpose = barrier.key().name()
- if purpose == _BarrierRecord.START:
- path = self.pipeline_handler_path
- countdown = None
- else:
- path = self.finalized_handler_path
- # NOTE: Wait one second before finalization to prevent
- # contention on the _PipelineRecord entity.
- countdown = 1
- pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
- logging.debug('Firing barrier %r', barrier.key())
- task_list.append(taskqueue.Task(
- url=path,
- countdown=countdown,
- name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
- params=dict(pipeline_key=pipeline_key, purpose=purpose),
- headers={'X-Ae-Pipeline-Key': pipeline_key}))
+ barriers_to_trigger.append(barrier)
else:
logging.debug('Not firing barrier %r, Waiting for slots: %r',
barrier.key(), pending_slots)
+ pipeline_keys_to_trigger = [
+ _BarrierRecord.target.get_value_for_datastore(barrier)
+ for barrier in barriers_to_trigger]
+ pipelines_to_trigger = dict(zip(
+ pipeline_keys_to_trigger, db.get(pipeline_keys_to_trigger)))
+ task_list = []
+ updated_barriers = []
+ for barrier in barriers_to_trigger:
+ if barrier.status != _BarrierRecord.FIRED:
+ barrier.status = _BarrierRecord.FIRED
+ barrier.trigger_time = self._gettime()
+ updated_barriers.append(barrier)
+
+ purpose = barrier.key().name()
+ if purpose == _BarrierRecord.START:
+ path = self.pipeline_handler_path
+ countdown = None
+ else:
+ path = self.finalized_handler_path
+ # NOTE: Wait one second before finalization to prevent
+ # contention on the _PipelineRecord entity.
+ countdown = 1
+ pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
+ target = pipelines_to_trigger[pipeline_key].params.get('target')
+ logging.debug('Firing barrier %r', barrier.key())
+ task_list.append(taskqueue.Task(
+ url=path,
+ countdown=countdown,
+ name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
+ params=dict(pipeline_key=pipeline_key, purpose=purpose),
+ headers={'X-Ae-Pipeline-Key': pipeline_key},
+ target=target))
+
# Blindly overwrite _BarrierRecords that have an updated status. This is
# acceptable because by this point all finalization barriers for
# generator children should have already had their final outputs assigned.
@@ -1933,16 +1957,6 @@
else:
# Generator yielded no children, so treat it as a sync function.
stage.outputs.default._set_value_test(stage._pipeline_key, None)
-
- # Enforce the policy of requiring all undeclared output slots from
- # child pipelines to be consumed by their parent generator.
- for slot in all_output_slots:
- if slot.name == 'default':
- continue
- if slot.filled and not slot._strict and not slot._touched:
- raise SlotNotDeclaredError(
- 'Undeclared output "%s"; all dynamic outputs from child '
- 'pipelines must be consumed.' % slot.name)
else:
try:
result = stage.run_test(*stage.args, **stage.kwargs)
@@ -2594,7 +2608,8 @@
params=dict(pipeline_key=pipeline_key,
purpose=_BarrierRecord.START,
attempt=pipeline_record.current_attempt),
- headers={'X-Ae-Pipeline-Key': pipeline_key})
+ headers={'X-Ae-Pipeline-Key': pipeline_key},
+ target=pipeline_record.params.get('target'))
task.add(queue_name=self.queue_name, transactional=True)
pipeline_record.put()
@@ -2712,7 +2727,7 @@
all_tasks.append(taskqueue.Task(
url=context.pipeline_handler_path,
params=dict(pipeline_key=pipeline_key),
- target=child_pipeline.params['target'],
+ target=child_pipeline.params.get('target'),
headers={'X-Ae-Pipeline-Key': pipeline_key},
name='ae-pipeline-fan-out-' + child_pipeline.key().name()))
@@ -2901,6 +2916,7 @@
outputs: Dictionary of output slot dictionaries.
children: List of child pipeline IDs.
queueName: Queue on which this pipeline is running.
+ target: Target version/module for the pipeline.
afterSlotKeys: List of Slot Ids after which this pipeline runs.
currentAttempt: Number of the current attempt, starting at 1.
maxAttempts: Maximum number of attempts before aborting.
@@ -2969,6 +2985,7 @@
'outputs': params['output_slots'].copy(),
'children': [key.name() for key in pipeline_record.fanned_out],
'queueName': params['queue_name'],
+ 'target': params['target'],
'afterSlotKeys': [str(key) for key in params['after_all']],
'currentAttempt': pipeline_record.current_attempt + 1,
'maxAttempts': pipeline_record.max_attempts,
diff --git a/python/src/pipeline/ui/status.css b/python/src/pipeline/ui/status.css
index f4d5105..1b581b6 100644
--- a/python/src/pipeline/ui/status.css
+++ b/python/src/pipeline/ui/status.css
@@ -83,6 +83,7 @@
/* detail-specific styling */
.status-param,
+.target-param,
.retry-param {
padding-left: 1em;
font-size: 0.85em;
@@ -110,6 +111,7 @@
#detail .param-container,
#detail .child-container,
#detail .run-after-container,
+#detail .status-target-params,
#detail .status-retry-params {
margin-top: 1em;
}
diff --git a/python/src/pipeline/ui/status.js b/python/src/pipeline/ui/status.js
index eca6da5..bd7fd72 100644
--- a/python/src/pipeline/ui/status.js
+++ b/python/src/pipeline/ui/status.js
@@ -326,6 +326,29 @@
containerDiv.append(linksDiv);
}
+ // Target parameters.
+ if (!sidebar) {
+ var targetParamsDiv = $('<div class="status-target-params">');
+ targetParamsDiv.append(
+ $('<div class="target-params-title">').text('Target parameters'));
+
+ var queueNameDiv = $('<div class="target-param">');
+ $('<span>').text('Queue name: ').appendTo(queueNameDiv);
+ $('<span>')
+ .text(infoMap.queueName)
+ .appendTo(queueNameDiv);
+ targetParamsDiv.append(queueNameDiv);
+
+ var targetDiv = $('<div class="target-param">');
+ $('<span>').text('Target: ').appendTo(targetDiv);
+ $('<span>')
+ .text(infoMap.target || 'Unspecified')
+ .appendTo(targetDiv);
+ targetParamsDiv.append(targetDiv);
+
+ containerDiv.append(targetParamsDiv);
+ }
+
// Retry parameters.
if (!sidebar) {
var retryParamsDiv = $('<div class="status-retry-params">');
diff --git a/python/src/pipeline/util.py b/python/src/pipeline/util.py
index 62eba10..96f28e8 100755
--- a/python/src/pipeline/util.py
+++ b/python/src/pipeline/util.py
@@ -32,6 +32,8 @@
except ImportError:
import simplejson as json
+from google.appengine.ext import ndb
+
# pylint: disable=protected-access
@@ -229,3 +231,17 @@
_register_json_primitive(datetime.datetime,
_json_encode_datetime,
_json_decode_datetime)
+
+# ndb.Key
+def _JsonEncodeKey(o):
+ """Json encode an ndb.Key object."""
+ return {'key_string': o.urlsafe()}
+
+def _JsonDecodeKey(d):
+ """Json decode a ndb.Key object."""
+ k_c = d['key_string']
+ if isinstance(k_c, (list, tuple)):
+ return ndb.Key(flat=k_c)
+ return ndb.Key(urlsafe=d['key_string'])
+
+_register_json_primitive(ndb.Key, _JsonEncodeKey, _JsonDecodeKey)
diff --git a/python/src/todelete.txt b/python/src/todelete.txt
new file mode 100644
index 0000000..4e94b58
--- /dev/null
+++ b/python/src/todelete.txt
@@ -0,0 +1,2 @@
+GoogleAppEngineCloudStorageClient
+cloudstorage
diff --git a/python/test/common_test.py b/python/test/common_test.py
index db86eb9..8be6ae5 100755
--- a/python/test/common_test.py
+++ b/python/test/common_test.py
@@ -27,13 +27,12 @@
from pipeline import common
from pipeline import pipeline
import test_shared
-from appengine_pipeline.test import testutil
+import testutil
-class CommonTest(test_shared.TaskRunningMixin, unittest.TestCase):
+class CommonTest(testutil.TestSetupMixin, test_shared.TaskRunningMixin, unittest.TestCase):
def setUp(self):
- testutil.setup_for_testing()
super(CommonTest, self).setUp()
def testReturn(self):