Revert "added method to job to get PromiseValue using handle"
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
index bf33e76..0c02328 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/Job.java
@@ -20,7 +20,6 @@
import com.google.appengine.tools.pipeline.impl.PromisedValueImpl;
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
-import com.google.appengine.tools.pipeline.impl.model.Slot;
import java.io.Serializable;
import java.util.List;
@@ -380,49 +379,12 @@
return promisedValue;
}
- /**
- * Invoke this method from within the {@code run} method of a <b>generator
- * job</b> in order to declare that some value will be provided asynchronously
- * by some external agent.
- *
- * @param <F> The type of the asynchronously provided value.
- * @return A {@code PromisedValue} that represents an empty value slot that
- * will be filled at a later time when the external agent invokes
- * {@link PipelineService#submitPromisedValue(String, Object)}. This
- * may be passed in to further invocations of {@code futureCall()} in
- * order to specify a data dependency.
- */
public <F> PromisedValue<F> newPromise() {
PromisedValueImpl<F> promisedValue =
new PromisedValueImpl<>(getPipelineKey(), thisJobRecord.getKey(), currentRunGUID);
updateSpec.getNonTransactionalGroup().includeSlot(promisedValue.getSlot());
return promisedValue;
}
-
- /**
- * Invoke this method from within the {@code run} method of a <b>generator
- * job</b> in order to get a promised value that was created by an ancestor
- * job that will be provided asynchronously by some external agent. This can
- * be used to share the same value with child {@link Job}s
- *
- * @param promiseHandle The unique identifier for the {@link PromisedValue}
- * obtained during the execution of some job via the method
- * {@link PromisedValue#getHandle()}.
- * @return A {@code PromisedValue} that represents an empty value slot that
- * will be filled at a later time when the external agent invokes
- * {@link PipelineService#submitPromisedValue(String, Object)}. This
- * may be passed in to further invocations of {@code futureCall()} in
- * order to specify a data dependency. This method will return
- * <code>null</code> if the slot for the handle could not be found.
- */
- public <F> PromisedValue<F> promise(String promiseHandle) {
- Slot slot = PipelineManager.getPromisedValueSlot(promiseHandle);
- PromisedValueImpl<F> promisedValue = null;
- if (slot != null) {
- promisedValue = new PromisedValueImpl<>(slot);
- }
- return promisedValue;
- }
/**
* Invoke this method from within the {@code run} method of a <b>generator
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
index 758aed4..4e80f57 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
@@ -24,7 +24,6 @@
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.OrphanedObjectException;
-import com.google.appengine.tools.pipeline.PromisedValue;
import com.google.appengine.tools.pipeline.Value;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd;
import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd;
@@ -414,74 +413,11 @@
backEnd.deletePipeline(key, force, async);
}
- /**
- * Fills a the {@link PromisedValue}.
- * @param promiseHandle the key to the promised value slot.
- * @param value the value to fill.
- * @throws NoSuchObjectException If there is no Job with the given key.
- * @throws OrphanedObjectException If the slot has been orphaned.
- */
public static void acceptPromisedValue(String promiseHandle, Object value)
throws NoSuchObjectException, OrphanedObjectException {
- Slot slot = queryPromisedValueSlot(promiseHandle);
- JobRecord generatorJob = getGeneratorJob(slot);
- UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
- registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
- backEnd.save(updateSpec, generatorJob.getQueueSettings());
- }
-
- private static JobRecord getGeneratorJob(Slot slot)
- throws OrphanedObjectException, NoSuchObjectException {
- Key generatorJobKey = slot.getGeneratorJobKey();
- if (null == generatorJobKey) {
- throw new RuntimeException(
- "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
- }
- JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
- if (null == generatorJob) {
- throw new RuntimeException("Pipeline is fatally corrupted. "
- + "The generator job for a promised value slot was not found: " + generatorJobKey);
- }
- String childGraphGuid = generatorJob.getChildGraphGuid();
- if (null == childGraphGuid) {
- // The generator job has not been saved with a childGraphGuid yet. This can happen if the
- // promise handle leaked out to an external thread before the job that generated it
- // had finished.
- throw new NoSuchObjectException(
- "The framework is not ready to accept the promised value yet. "
- + "Please try again after the job that generated the promis handle has completed.");
- }
- if (!childGraphGuid.equals(slot.getGraphGuid())) {
- // The slot has been orphaned
- throw new OrphanedObjectException(KeyFactory.keyToString(slot.getKey()));
- }
-
- return generatorJob;
- }
-
- /**
- * Get the {@link Slot} for a promise handle that can be used to construct a
- * {@link PromisedValue} for use in child {@link Job}s. It will attempt to lookup
- * the key 5 times with an exponential back-off just in case it has been requested
- * before the promise has been registered.
- * @param promiseHandle Key to find the {@link Slot}
- * @return A {@link Slot} if the handle can be resolved otherwise <code>null</code>.
- */
- public static Slot getPromisedValueSlot(String promiseHandle) {
- Slot slot = null;
- try {
- slot = queryPromisedValueSlot(promiseHandle);
- } catch (Exception e) {
- logger.log(Level.WARNING, "Find slot for promise with handle " + promiseHandle + " failed.", e);
- }
- return slot;
- }
-
- private static Slot queryPromisedValueSlot(String promiseHandle)
- throws NoSuchObjectException {
- Slot slot = null;
checkNonEmpty(promiseHandle, "promiseHandle");
Key key = KeyFactory.stringToKey(promiseHandle);
+ Slot slot = null;
// It is possible, though unlikely, that we might be asked to accept a
// promise before the slot to hold the promise has been saved. We will try 5
// times, sleeping 1, 2, 4, 8 seconds between attempts.
@@ -509,9 +445,34 @@
Thread.currentThread().interrupt();
}
}
- return slot;
+ Key generatorJobKey = slot.getGeneratorJobKey();
+ if (null == generatorJobKey) {
+ throw new RuntimeException(
+ "Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
+ }
+ JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
+ if (null == generatorJob) {
+ throw new RuntimeException("Pipeline is fatally corrupted. "
+ + "The generator job for a promised value slot was not found: " + generatorJobKey);
+ }
+ String childGraphGuid = generatorJob.getChildGraphGuid();
+ if (null == childGraphGuid) {
+ // The generator job has not been saved with a childGraphGuid yet. This can happen if the
+ // promise handle leaked out to an external thread before the job that generated it
+ // had finished.
+ throw new NoSuchObjectException(
+ "The framework is not ready to accept the promised value yet. "
+ + "Please try again after the job that generated the promis handle has completed.");
+ }
+ if (!childGraphGuid.equals(slot.getGraphGuid())) {
+ // The slot has been orphaned
+ throw new OrphanedObjectException(promiseHandle);
+ }
+ UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
+ registerSlotFilled(updateSpec, generatorJob.getQueueSettings(), slot, value);
+ backEnd.save(updateSpec, generatorJob.getQueueSettings());
}
-
+
/**
* The root job instance used to wrap a user provided root if the user
* provided root job has exceptionHandler specified.
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
index e465d7b..06977f2 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PromisedValueImpl.java
@@ -29,12 +29,8 @@
*/
public class PromisedValueImpl<E> extends FutureValueImpl<E> implements PromisedValue<E> {
- public PromisedValueImpl(Slot slot) {
- super(slot);
- }
-
public PromisedValueImpl(Key rootJobGuid, Key generatorJobKey, String graphGUID) {
- this(new Slot(rootJobGuid, generatorJobKey, graphGUID));
+ super(new Slot(rootJobGuid, generatorJobKey, graphGUID));
}
@Override
diff --git a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
index 4ce9ec3..2e6a19d 100644
--- a/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
+++ b/java/src/test/java/com/google/appengine/tools/pipeline/MiscPipelineTest.java
@@ -16,7 +16,6 @@
import com.google.appengine.tools.pipeline.JobInfo.State;
import com.google.appengine.tools.pipeline.JobSetting.StatusConsoleUrl;
-import com.google.appengine.tools.pipeline.JobSetting.WaitForSetting;
import com.google.appengine.tools.pipeline.impl.PipelineManager;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.impl.model.PipelineObjects;
@@ -645,88 +644,4 @@
return immediate(bytes.length);
}
}
-
- public void testUnfilledPromiseFromHandle() throws Exception {
- PipelineService service = PipelineServiceFactory.newPipelineService();
- String pipelineId = service.startNewPipeline(new UnfilledPromiseFromHandleParentJob());
- String value = waitForJobToComplete(pipelineId);
- assertEquals("1323", value);
- }
-
- @SuppressWarnings("serial")
- private static class UnfilledPromiseFromHandleParentJob extends Job0<String> {
-
- @Override
- public Value<String> run() throws Exception {
- PromisedValue<String> promise = newPromise();
- FutureValue<String> child1 = futureCall(new PromiseDelegateJob(), immediate("1"),
- immediate(promise.getHandle()));
- FutureValue<String> child2 = futureCall(new PromiseDelegateJob(), immediate("2"),
- immediate(promise.getHandle()));
- futureCall(new FillPromiseJob(), immediate("3"), immediate(promise.getHandle()));
- FutureValue<String> child4 = futureCall(new StringAdderJob(), child1, child2);
- return child4;
- }
- }
-
- @SuppressWarnings("serial")
- private static class PromiseDelegateJob extends Job2<String, String, String> {
- @Override
- public Value<String> run(String value, String handle) {
- PromisedValue<String> promise = promise(handle);
- FutureValue<String> added = futureCall(new StringAdderJob(), immediate(value), promise);
- return added;
- }
- }
-
- @SuppressWarnings("serial")
- private static class StringAdderJob extends Job2<String, String, String> {
- @Override
- public Value<String> run(String value1, String value2) {
- return immediate(value1 + value2);
- }
- }
-
- public void testFilledPromiseFromHandle() throws Exception {
- PipelineService service = PipelineServiceFactory.newPipelineService();
- String pipelineId = service.startNewPipeline(new FilledPromiseFromHandleParentJob());
- String value = waitForJobToComplete(pipelineId);
- assertEquals("1323", value);
- }
-
- @SuppressWarnings("serial")
- private static class FilledPromiseFromHandleParentJob extends Job0<String> {
-
- @Override
- public Value<String> run() throws Exception {
- PromisedValue<String> promise = newPromise();
- WaitForSetting wait = waitFor(futureCall(new FillPromiseJob(), immediate("3"),
- immediate(promise.getHandle())));
-
- FutureValue<String> child1 = futureCall(new PromiseDelegateJob(), immediate("1"),
- immediate(promise.getHandle()), wait);
- FutureValue<String> child2 = futureCall(new PromiseDelegateJob(), immediate("2"),
- immediate(promise.getHandle()), wait);
-
- return futureCall(new StringAdderJob(), child1, child2);
- }
- }
-
- public void testPromiseFromNonExistentHandle() throws Exception {
- PipelineService service = PipelineServiceFactory.newPipelineService();
- String pipelineId = service.startNewPipeline(new PromiseFromNonExistentHandleParentJob());
- Boolean value = waitForJobToComplete(pipelineId);
- assertTrue(value);
- }
-
- @SuppressWarnings("serial")
- private static class PromiseFromNonExistentHandleParentJob extends Job0<Boolean> {
-
- @Override
- public Value<Boolean> run() throws Exception {
- PromisedValue<String> promise = promise("SOME_NON_HANDLE");
- return immediate(promise == null);
- }
- }
-
}