blob: 2e6a19d83fe542b6644b815578058e6ddbc2d183 [file] [log] [blame]
// Copyright 2011 Google Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
* Misc tests including:
* Passing large values.
* Delay used in a slow job
* JobSetting inheritance
* PromisedValue (and filling it more than once)
* Cancel pipeline
* WaitFor passed to a new pipeline
* FutureValue passed to a new pipeline
* @author (Mitch Rudominer)
public class MiscPipelineTest extends PipelineTest {
private static long[] largeValue;
public void setUp() throws Exception {
largeValue = new long[2000000];
Random random = new Random();
for (int i = 0; i < largeValue.length; i++) {
largeValue[i] = random.nextLong();
protected boolean isHrdSafe() {
return false;
private static class ReturnFutureListJob extends Job0<List<String>> {
public Value<List<String>> run() throws Exception {
FutureValue<String> child1 = futureCall(new StrJob<>(), immediate(Long.valueOf(123)));
FutureValue<String> child2 = futureCall(new StrJob<>(), immediate(Long.valueOf(456)));
return new FutureList<>(ImmutableList.of(child1, child2));
public void testReturnFutureList() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new ReturnFutureListJob());
List<String> value = waitForJobToComplete(pipelineId);
assertEquals(ImmutableList.of("123", "456"), value);
private static class StringToLong implements Function<String, Long>, Serializable {
private static final long serialVersionUID = -913828405842203610L;
public Long apply(String input) {
return Long.valueOf(input);
private static class TestTransformJob extends Job0<Long> {
public Value<Long> run() {
FutureValue<String> child1 = futureCall(new StrJob<>(), immediate(Long.valueOf(123)));
return futureCall(new Jobs.Transform<>(new StringToLong()), child1);
public void testTransform() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new TestTransformJob());
Long value = waitForJobToComplete(pipelineId);
assertEquals(Long.valueOf(123), value);
private static class RootJob extends Job0<String> {
private final boolean delete;
RootJob(boolean delete) {
this.delete = delete;
public Value<String> run() throws Exception {
FutureValue<String> child1 = futureCall(new StrJob<>(), immediate(Long.valueOf(123)));
FutureValue<String> child2 = futureCall(new StrJob<>(), immediate(Long.valueOf(456)));
if (delete) {
return Jobs.waitForAllAndDelete(this, child1, child2);
} else {
return Jobs.waitForAll(this, child1, child2);
public void testWaitForAll() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new RootJob(false));
JobInfo jobInfo = waitUntilJobComplete(pipelineId);
assertEquals(JobInfo.State.COMPLETED_SUCCESSFULLY, jobInfo.getJobState());
assertEquals("123", jobInfo.getOutput());
public void testWaitForAllAndDelete() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new RootJob(true));
JobInfo jobInfo = waitUntilJobComplete(pipelineId);
assertEquals(JobInfo.State.COMPLETED_SUCCESSFULLY, jobInfo.getJobState());
assertEquals("123", jobInfo.getOutput());
try {
fail("Was expecting a NoSuchObjectException exception");
} catch (NoSuchObjectException expected) {
// expected;
private static class CallerJob extends Job0<String> {
static AtomicBoolean flag = new AtomicBoolean();
public Value<String> run() throws Exception {
FutureValue<Void> child1 = futureCall(new ChildJob());
FutureValue<String> child2 = futureCall(new StrJob<>(), immediate(Long.valueOf(123)));
PipelineService service = PipelineServiceFactory.newPipelineService();
String str1 = service.startNewPipeline(new EchoJob<>(), child2);
String str2 = service.startNewPipeline(new CalledJob(), waitFor(child1));
return immediate(str1 + "," + str2);
private static class EchoJob<T> extends Job1<T, T> {
public Value<T> run(T t) throws Exception {
return immediate(t);
private static class ChildJob extends Job0<Void> {
public Value<Void> run() throws Exception {
return null;
private static class CalledJob extends Job0<Boolean> {
public Value<Boolean> run() throws Exception {
return immediate(CallerJob.flag.get());
public void testWaitForUsedByNewPipeline() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new CallerJob());
JobInfo jobInfo = waitUntilJobComplete(pipelineId);
assertEquals(JobInfo.State.COMPLETED_SUCCESSFULLY, jobInfo.getJobState());
String[] calledPipelines = ((String) jobInfo.getOutput()).split(",");
jobInfo = waitUntilJobComplete(calledPipelines[0]);
assertEquals(JobInfo.State.COMPLETED_SUCCESSFULLY, jobInfo.getJobState());
assertEquals("123", jobInfo.getOutput());
jobInfo = waitUntilJobComplete(calledPipelines[1]);
assertEquals(JobInfo.State.COMPLETED_SUCCESSFULLY, jobInfo.getJobState());
assertTrue((Boolean) jobInfo.getOutput());
private abstract static class AbstractJob extends Job0<String> {
public Value<String> run() throws Exception {
return immediate(getValue());
protected abstract String getValue();
private static class ConcreteJob extends AbstractJob {
protected String getValue() {
return "Shalom";
public String getJobDisplayName() {
return "ConcreteJob: " + getValue();
public Value<String> handleException(Throwable t) {
return immediate("Got exception!");
public void testGetJobDisplayName() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
ConcreteJob job = new ConcreteJob();
String pipelineId = service.startNewPipeline(job);
JobRecord jobRecord = PipelineManager.getJob(pipelineId);
assertEquals(job.getJobDisplayName(), jobRecord.getRootJobDisplayName());
JobInfo jobInfo = waitUntilJobComplete(pipelineId);
assertEquals("Shalom", jobInfo.getOutput());
jobRecord = PipelineManager.getJob(pipelineId);
assertEquals(job.getJobDisplayName(), jobRecord.getRootJobDisplayName());
PipelineObjects pobjects = PipelineManager.queryFullPipeline(pipelineId);
assertEquals(job.getJobDisplayName(), pobjects.rootJob.getRootJobDisplayName());
public void testJobInheritence() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new ConcreteJob());
JobInfo jobInfo = waitUntilJobComplete(pipelineId);
assertEquals("Shalom", jobInfo.getOutput());
private static class FailedJob extends Job0<String> {
public Value<String> run() throws Exception {
throw new RuntimeException("koko");
public void testJobFailure() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new FailedJob());
JobInfo jobInfo = waitUntilJobComplete(pipelineId);
assertEquals(JobInfo.State.STOPPED_BY_ERROR, jobInfo.getJobState());
assertEquals("koko", jobInfo.getException().getMessage());
public void testReturnValue() throws Exception {
// Testing that return value from parent is always after all children complete
// which is not the case right now. This this *SHOULD* change after we fix
// it, as fixing it should cause a dead-lock.
// see b/12249138
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new ReturnValueParentJob());
String value = waitForJobToComplete(pipelineId);
assertEquals("bla", value);
private static class ReturnValueParentJob extends Job0<String> {
static CountDownLatch latch1 = new CountDownLatch(1);
static CountDownLatch latch2 = new CountDownLatch(1);
public Value<String> run() throws Exception {
futureCall(new ReturnedValueChildJob());
return immediate("bla");
private static class ReturnedValueChildJob extends Job0<Void> {
public Value<Void> run() throws Exception {
return null;
public void testSubmittingPromisedValueMoreThanOnce() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new SubmitPromisedParentJob());
String value = waitForJobToComplete(pipelineId);
assertEquals("2", value);
private static class SubmitPromisedParentJob extends Job0<String> {
public Value<String> run() throws Exception {
PromisedValue<String> promise = newPromise();
FutureValue<Void> child1 = futureCall(
new FillPromiseJob(), immediate("1"), immediate(promise.getHandle()));
FutureValue<Void> child2 = futureCall(
new FillPromiseJob(), immediate("2"), immediate(promise.getHandle()), waitFor(child1));
FutureValue<String> child3 = futureCall(new ReadPromiseJob(), promise, waitFor(child2));
// If we return promise directly then the value would be "1" rather than "2", see b/12216307
//return promise;
return child3;
private static class ReadPromiseJob extends Job1<String, String> {
public Value<String> run(String value) {
return immediate(value);
private static class FillPromiseJob extends Job2<Void, String, String> {
public Value<Void> run(String value, String handle) throws Exception {
PipelineServiceFactory.newPipelineService().submitPromisedValue(handle, value);
return null;
public void testCancelPipeline() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new HandleExceptionParentJob(),
new JobSetting.BackoffSeconds(1), new JobSetting.BackoffFactor(1),
new JobSetting.MaxAttempts(2));
JobInfo jobInfo = service.getJobInfo(pipelineId);
assertEquals(State.RUNNING, jobInfo.getJobState());
jobInfo = service.getJobInfo(pipelineId);
assertEquals(State.CANCELED_BY_REQUEST, jobInfo.getJobState());
// Unexpected callbacks (should be fixed after b/12250957)
assertTrue(HandleExceptionParentJob.child3Cancel.get()); // job not started
assertTrue(HandleExceptionParentJob.child4Cancel.get()); // job not started
// expected callbacks
assertFalse(HandleExceptionParentJob.child0Cancel.get()); // job already finalized
assertTrue(HandleExceptionParentJob.child2Cancel.get()); // after job run, but not finalized
private static class HandleExceptionParentJob extends Job0<String> {
private static AtomicBoolean parentCancel = new AtomicBoolean();
private static AtomicBoolean child0 = new AtomicBoolean();
private static AtomicBoolean child0Cancel = new AtomicBoolean();
private static AtomicBoolean child1 = new AtomicBoolean();
private static AtomicBoolean child1Cancel = new AtomicBoolean();
private static AtomicBoolean child2 = new AtomicBoolean();
private static AtomicBoolean child2Cancel = new AtomicBoolean();
private static AtomicBoolean child3 = new AtomicBoolean();
private static AtomicBoolean child3Cancel = new AtomicBoolean();
private static AtomicBoolean child4 = new AtomicBoolean();
private static AtomicBoolean child4Cancel = new AtomicBoolean();
public Value<String> run() {
FutureValue<String> child0 = futureCall(new HandleExceptionChild0Job());
FutureValue<String> child1 = futureCall(new HandleExceptionChild1Job(), waitFor(child0));
FutureValue<String> child2 = futureCall(new HandleExceptionChild3Job(), waitFor(child1));
return futureCall(new HandleExceptionChild4Job(), child1, child2);
public Value<String> handleException(CancellationException ex) throws Exception {
return immediate("should not be used");
private static class HandleExceptionChild0Job extends Job0<String> {
public Value<String> run() {
return immediate("1");
public Value<String> handleException(CancellationException ex) {
return null;
private static class HandleExceptionChild1Job extends Job0<String> {
public Value<String> run() {
return futureCall(new HandleExceptionChild2Job());
public Value<String> handleException(CancellationException ex) {
return null;
private static class HandleExceptionChild2Job extends Job0<String> {
private static CountDownLatch childLatch1 = new CountDownLatch(1);
private static CountDownLatch childLatch2 = new CountDownLatch(1);
public Value<String> run() throws InterruptedException {
return immediate("1");
public Value<String> handleException(CancellationException ex) {
return null;
private static class HandleExceptionChild3Job extends Job0<String> {
public Value<String> run() {
return immediate("2");
public Value<String> handleException(CancellationException ex) {
return null;
private static class HandleExceptionChild4Job extends Job2<String, String, String> {
public Value<String> run(String str1, String str2) {
return immediate(str1 + str2);
public Value<String> handleException(CancellationException ex) {
return immediate("not going to be used");
public void testImmediateChild() throws Exception {
// This is also testing inheritance of statusConsoleUrl.
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new Returns5FromChildJob(), largeValue);
Integer five = waitForJobToComplete(pipelineId);
assertEquals(5, five.intValue());
public void testPromisedValue() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new FillPromisedValueJob());
String helloWorld = (String) waitForJobToComplete(pipelineId);
assertEquals("hello world", helloWorld);
public void testDelayedValueInSlowJob() throws Exception {
PipelineService service = PipelineServiceFactory.newPipelineService();
String pipelineId = service.startNewPipeline(new UsingDelayedValueInSlowJob());
String hello = (String) waitForJobToComplete(pipelineId);
assertEquals("I am delayed", hello);
static class Temp<T extends Serializable> implements Serializable {
private final T value;
Temp(T value) {
this.value = value;
T getValue() {
return value;
private static class FillPromisedValueJob extends Job0<String> {
public Value<String> run() {
PromisedValue<List<Temp<String>>> ps = newPromise();
futureCall(new PopulatePromisedValueJob(), immediate(ps.getHandle()));
return futureCall(new ConsumePromisedValueJob(), ps);
private static class PopulatePromisedValueJob extends Job1<Void, String> {
public Value<Void> run(String handle) throws NoSuchObjectException, OrphanedObjectException {
List<Temp<String>> list = new ArrayList<>();
list.add(new Temp<>("hello"));
list.add(new Temp<>(" "));
list.add(new Temp<>("world"));
PipelineManager.acceptPromisedValue(handle, list);
return null;
private static class ConsumePromisedValueJob extends Job1<String, List<Temp<String>>> {
public Value<String> run(List<Temp<String>> values) {
String value = "";
for (Temp<String> temp : values) {
value += temp.getValue();
return immediate(value);
private static class StrJob<T extends Serializable> extends Job1<String, T> {
public Value<String> run(T obj) {
return immediate(obj == null ? "null" : obj.toString());
private static class UsingDelayedValueInSlowJob extends Job0<String> {
public Value<String> run() throws InterruptedException {
Value<Void> delayedValue = newDelayedValue(1);
// We would like to validate that the delay will work even when used after
// its delayed value. It used to fail before, b/12081152, but now it should
// pass (and semantic of the delay was changed, so delay value starts only
// after this run method completes.
return futureCall(new StrJob<>(), immediate("I am delayed"), waitFor(delayedValue));
private static class Returns5FromChildJob extends Job1<Integer, long[]> {
public Value<Integer> run(long[] bytes) {
assertTrue(Arrays.equals(largeValue, bytes));
FutureValue<Integer> lengthJob = futureCall(new LengthJob(), immediate(bytes));
return futureCall(
new IdentityJob(bytes), immediate(5), lengthJob, new StatusConsoleUrl(null));
private static class IdentityJob extends Job2<Integer, Integer, Integer> {
private final long[] bytes;
public IdentityJob(long[] bytes) {
this.bytes = bytes;
public Value<Integer> run(Integer param1, Integer length) {
assertEquals(largeValue.length, length.intValue());
assertTrue(Arrays.equals(largeValue, bytes));
return immediate(param1);
private static class LengthJob extends Job1<Integer, long[]> {
public Value<Integer> run(long[] bytes) {
assertEquals("my-console-url", getStatusConsoleUrl());
assertTrue(Arrays.equals(largeValue, bytes));
return immediate(bytes.length);